Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hadoop: fix background task thread leak #5363

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/sdktest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ jobs:
mkdir etc
chmod 777 etc
echo `hostname` > etc/nodes
echo "tom:3001" > users
echo "g1:2001:tom" > groups

- name: Sdk Test
run: |
Expand Down
51 changes: 32 additions & 19 deletions sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ var (
bridges []*Bridge
pOnce sync.Once
pushers []*push.Pusher

userGroupCache = make(map[string]map[string][]string) // name -> (user -> groups)

)

const (
Expand Down Expand Up @@ -340,7 +343,16 @@ func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.F
logger.Infof("JuiceFileSystem created for user:%s group:%s", user, group)
}
w := &wrapper{jfs, nil, m, user, superuser, supergroup}
if w.isSuperuser(user, strings.Split(group, ",")) {
var gs []string
if userGroupCache[name] != nil {
gs = userGroupCache[name][user]
}
if gs == nil {
gs = strings.Split(group, ",")
}
group = strings.Join(gs, ",")
logger.Debugf("update groups of %s to %s", user, group)
if w.isSuperuser(user, gs) {
w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0})
} else {
w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(user), w.lookupGids(group))
Expand Down Expand Up @@ -603,11 +615,8 @@ func F(p int64) *wrapper {
}

//export jfs_update_uid_grouping
func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) {
w := F(h)
if w == nil {
return
}
func jfs_update_uid_grouping(cname, uidstr *C.char, grouping *C.char) {
name := C.GoString(cname)
var uids []pwent
if uidstr != nil {
for _, line := range strings.Split(C.GoString(uidstr), "\n") {
Expand All @@ -627,8 +636,9 @@ func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) {
logger.Debugf("Update uids mapping\n %s", buffer.String())
}

var userGroups = make(map[string][]string) // user -> groups

var gids []pwent
var groups []string
if grouping != nil {
for _, line := range strings.Split(C.GoString(grouping), "\n") {
fields := strings.Split(line, ":")
Expand All @@ -640,29 +650,32 @@ func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) {
gids = append(gids, pwent{uint32(gid), gname})
if len(fields) > 2 {
for _, user := range strings.Split(fields[len(fields)-1], ",") {
if strings.TrimSpace(user) == w.user {
groups = append(groups, gname)
}
userGroups[user] = append(userGroups[user], gname)
}
}
}
logger.Debugf("Update groups of %s to %s", w.user, strings.Join(groups, ","))
var buffer bytes.Buffer
for _, g := range gids {
buffer.WriteString(fmt.Sprintf("\t%v:%v\n", g.name, g.id))
}
logger.Debugf("Update gids mapping\n %s", buffer.String())
}
w.m.update(uids, gids, false)

if w.isSuperuser(w.user, groups) {
w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0})
} else {
gids := w.ctx.Gids()
if len(groups) > 0 {
gids = w.lookupGids(strings.Join(groups, ","))
fslock.Lock()
defer fslock.Unlock()
userGroupCache[name] = userGroups
ws := activefs[name]
if len(ws) > 0 {
m := ws[0].m
m.update(uids, gids, false)
for _, w := range ws {
logger.Debugf("Update groups of %s to %s", w.user, strings.Join(userGroups[w.user], ","))
if w.isSuperuser(w.user, userGroups[w.user]) {
w.ctx = meta.NewContext(uint32(os.Getpid()), 0, []uint32{0})
} else {
w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(w.user), w.lookupGids(strings.Join(userGroups[w.user], ",")))
}
}
w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(w.user), gids)
}
}

Expand Down
36 changes: 15 additions & 21 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class JuiceFileSystem extends FilterFileSystem {
private static boolean fileChecksumEnabled = false;
private static boolean distcpPatched = false;

private FileSystem emptierFs;

static {
PatchUtil.patchBefore("org.apache.flink.runtime.fs.hdfs.HadoopRecoverableFsDataOutputStream",
"waitUntilLeaseIsRevoked",
Expand All @@ -68,25 +66,21 @@ private synchronized static void patchDistCpChecksum() {
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
fileChecksumEnabled = Boolean.parseBoolean(getConf(conf, "file.checksum", "false"));
if (!Boolean.parseBoolean(getConf(conf, "disable-trash-emptier", "false"))) {
startTrashEmptier(uri, conf);
}
}

private void startTrashEmptier(URI uri, final Configuration conf) throws IOException {
if (BgTaskUtil.isRunning(uri.getHost(), "Trash emptier")) {
return;
}
try {
UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs"));
emptierFs = superUser.doAs((PrivilegedExceptionAction<FileSystem>) () -> {
JuiceFileSystemImpl fs = new JuiceFileSystemImpl();
fs.initialize(uri, conf);
return fs;
});
BgTaskUtil.startTrashEmptier(uri.getHost(), "Trash emptier", emptierFs, new Trash(emptierFs, conf).getEmptier(), TimeUnit.MINUTES.toMillis(10));
} catch (Exception e) {
throw new IOException("start trash failed!",e);
boolean asBgTask = conf.getBoolean("juicefs.internal-bg-task", false);
if (!asBgTask && !Boolean.parseBoolean(getConf(conf, "disable-trash-emptier", "false"))) {
try {
Configuration newConf = new Configuration(conf);
newConf.setBoolean("juicefs.internal-bg-task", true);
UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs"));
FileSystem emptierFs = superUser.doAs((PrivilegedExceptionAction<FileSystem>) () -> {
JuiceFileSystemImpl fs = new JuiceFileSystemImpl();
fs.initialize(uri, newConf);
return fs;
});
BgTaskUtil.startTrashEmptier(uri.getHost(), emptierFs, new Trash(emptierFs, conf).getEmptier(), 10, TimeUnit.MINUTES);
} catch (Exception e) {
LOG.warn("start trash emptier for {} failed", uri.getHost(), e);
}
}
}

Expand Down
108 changes: 49 additions & 59 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarFile;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -110,12 +111,13 @@ static String loadVersion() {
private long handle;
private UserGroupInformation ugi;
private String homeDirPrefix = "/user";
private Map<String, String> cachedHosts = new HashMap<>(); // (ip, hostname)
private ConsistentHash<String> hash = new ConsistentHash<>(1, Collections.singletonList("localhost"));
private static final Map<String, Map<String, String>> cachedHostsForName = new ConcurrentHashMap<>(); // (name -> (ip -> hostname))
private static final Map<String, ConsistentHash<String>> hashForName = new ConcurrentHashMap<>(); // (name -> consistentHash)
private static final Map<String, FileStatus> lastFileStatus = new ConcurrentHashMap<>();

private FsPermission uMask;
private String hflushMethod;

private Map<String, FileStatus> lastFileStatus = new HashMap<>();
private static final DirectBufferPool directBufferPool = new DirectBufferPool();

private boolean metricsEnable = false;
Expand Down Expand Up @@ -143,7 +145,7 @@ static String loadVersion() {
public static interface Libjfs {
long jfs_init(String name, String jsonConf, String user, String group, String superuser, String supergroup);

void jfs_update_uid_grouping(long h, String uidstr, String grouping);
void jfs_update_uid_grouping(String name, String uidstr, String grouping);

int jfs_term(long pid, long h);

Expand Down Expand Up @@ -384,7 +386,6 @@ public void initialize(URI uri, Configuration conf) throws IOException {
groups.clear();
groups.add(supergroup);
}
String mountpoint = getConf(conf, "mountpoint", "");

synchronized (JuiceFileSystemImpl.class) {
if (callBack == null) {
Expand Down Expand Up @@ -448,10 +449,10 @@ public void initialize(URI uri, Configuration conf) throws IOException {
if (handle <= 0) {
throw new IOException("JuiceFS initialized failed for jfs://" + name);
}

initCache(conf);
refreshCache(conf);

boolean asBgTask = conf.getBoolean("juicefs.internal-bg-task", false);
if (!asBgTask) {
BgTaskUtil.register(name, handle);
}
homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");
this.workingDir = getHomeDirectory();

Expand Down Expand Up @@ -514,11 +515,13 @@ public void initialize(URI uri, Configuration conf) throws IOException {
}
}

String uidFile = getConf(conf, "users", null);
if (!isEmpty(uidFile) || !isEmpty(groupingFile)) {
updateUidAndGrouping(uidFile, groupingFile);
if (!isSuperGroupFileSystem) {
refreshUidAndGrouping(uidFile, groupingFile);
if (!asBgTask && !isSuperGroupFileSystem) {
// use juicefs.users and juicefs.groups for global mapping
String uidFile = getConf(conf, "users", null);
if (!isEmpty(uidFile) || !isEmpty(groupingFile)) {
BgTaskUtil.putTask(name, "Refresh guid", () -> {
updateUidAndGrouping(uidFile, groupingFile);
}, 1, 1, TimeUnit.MINUTES);
}
}
}
Expand Down Expand Up @@ -574,38 +577,20 @@ private boolean isEmpty(String str) {

private String readFile(String file) throws IOException {
Path path = new Path(file);
URI uri = path.toUri();
FileSystem fs;
try {
URI defaultUri = getDefaultUri(getConf());
if (uri.getScheme() == null) {
uri = defaultUri;
} else {
if (uri.getAuthority() == null && (uri.getScheme().equals(defaultUri.getScheme()))) {
uri = defaultUri;
}
}
if (getScheme().equals(uri.getScheme()) &&
(name != null && name.equals(uri.getAuthority()))) {
fs = this;
} else {
fs = path.getFileSystem(getConf());
}

FileStatus lastStatus = lastFileStatus.get(file);
FileStatus lastStatus = lastFileStatus.get(file);
Configuration newConf = new Configuration(getConf());
newConf.setBoolean("juicefs.internal-bg-task", true);
try (FileSystem fs = FileSystem.newInstance(path.toUri(), newConf)) {
FileStatus status = fs.getFileStatus(path);
if (lastStatus != null && status.getModificationTime() == lastStatus.getModificationTime()
&& status.getLen() == lastStatus.getLen()) {
&& status.getLen() == lastStatus.getLen()) {
return null;
}
FSDataInputStream in = fs.open(path);
String res = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n"));
in.close();
lastFileStatus.put(file, status);
return res;
} catch (IOException e) {
LOG.warn(String.format("read %s failed", file), e);
throw e;
try (FSDataInputStream in = fs.open(path)) {
String res = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n"));
lastFileStatus.put(file, status);
return res;
}
}
}

Expand Down Expand Up @@ -635,16 +620,10 @@ private void updateUidAndGrouping(String uidFile, String groupFile) throws IOExc
grouping = readFile(groupFile);
}

lib.jfs_update_uid_grouping(handle, uidstr, grouping);
lib.jfs_update_uid_grouping(name, uidstr, grouping);
groups = Arrays.stream(group.split(",")).collect(Collectors.toSet());
}

private void refreshUidAndGrouping(String uidFile, String groupFile) {
BgTaskUtil.startScheduleTask(name, "Refresh guid", () -> {
updateUidAndGrouping(uidFile, groupFile);
}, 1, 1, TimeUnit.MINUTES);
}

private void initializeStorageIds(Configuration conf) throws IOException {
try {
Class<?> clazz = Class.forName("org.apache.hadoop.fs.BlockLocation");
Expand Down Expand Up @@ -834,24 +813,18 @@ private void initCache(Configuration conf) {
}

// if newCachedHosts are not changed, skip
if (!newCachedHosts.equals(cachedHosts)) {
if (!newCachedHosts.equals(cachedHostsForName.get(name))) {
List<String> ips = new ArrayList<>(newCachedHosts.keySet());
LOG.debug("update nodes to: " + String.join(",", ips));
this.hash = new ConsistentHash<>(100, ips);
this.cachedHosts = newCachedHosts;
hashForName.put(name, new ConsistentHash<>(100, ips));
cachedHostsForName.put(name, newCachedHosts);
}
}
} catch (Throwable e) {
LOG.warn("failed to discover nodes", e);
}
}

private void refreshCache(Configuration conf) {
BgTaskUtil.startScheduleTask(name, "Node fetcher", () -> {
initCache(conf);
}, 10, 10, TimeUnit.MINUTES);
}

private List<String> discoverNodes(String urls) {
LOG.debug("fetching nodes from {}", urls);
NodesFetcher fetcher = NodesFetcherBuilder.buildFetcher(urls, name, this);
Expand All @@ -868,7 +841,13 @@ private BlockLocation makeLocation(long code, long start, long len) {
BlockLocation blockLocation;
String[] ns = new String[cacheReplica];
String[] hs = new String[cacheReplica];
String host = cachedHosts.getOrDefault(hash.get(code + "-" + index), "localhost");

Map<String, String> cachedHosts = cachedHostsForName.get(name);
ConsistentHash<String> hash = hashForName.get(name);
String host = "localhost";
if (cachedHosts != null && hash != null) {
host = cachedHosts.get(hash.get(code + "-" + index));
}
ns[0] = host + ":50010";
hs[0] = host;
for (int i = 1; i < cacheReplica; i++) {
Expand Down Expand Up @@ -916,6 +895,11 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
String[] host = new String[]{"localhost"};
return new BlockLocation[]{new BlockLocation(name, host, 0L, file.getLen())};
}

BgTaskUtil.putTask(name, "Node fetcher", () -> {
initCache(getConf());
}, 10, 10, TimeUnit.MINUTES);

if (file.getLen() <= start + len) {
len = file.getLen() - start;
}
Expand Down Expand Up @@ -1872,6 +1856,12 @@ public void setTimes(Path p, long mtime, long atime) throws IOException {
@Override
public void close() throws IOException {
super.close();
BgTaskUtil.unregister(name, handle, () -> {
cachedHostsForName.clear();
hashForName.clear();
lastFileStatus.clear();
});
LOG.debug("close {}({})", name, handle);
lib.jfs_term(Thread.currentThread().getId(), handle);
if (metricsEnable) {
JuiceFSInstrumentation.close();
Expand Down
Loading
Loading