Skip to content

Commit

Permalink
[fix] Remove blocking calls from BookieRackAffinityMapping (apache#22846
Browse files Browse the repository at this point in the history
)

(cherry picked from commit aece67e)
  • Loading branch information
merlimat authored and nodece committed Aug 15, 2024
1 parent 31c6312 commit 4402ce5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();

public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
static MetadataStore getMetadataStore(Configuration conf) throws MetadataException {
MetadataStore store;
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty != null) {
Expand Down Expand Up @@ -115,12 +115,20 @@ public synchronized void setConf(Configuration conf) {
super.setConf(conf);
MetadataStore store;
try {
store = createMetadataStore(conf);
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
store = getMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}

bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);

try {
var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
.get();

for (var bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
}
Expand All @@ -130,10 +138,12 @@ public synchronized void setConf(Configuration conf) {
}
}
updateRacksWithHost(racksWithHost);
watchAvailableBookies();
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed to update rack info. ", e);
throw new RuntimeException(e);
}

watchAvailableBookies();
}

private void watchAvailableBookies() {
Expand All @@ -144,13 +154,13 @@ private void watchAvailableBookies() {
field.setAccessible(true);
RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver);
registrationClient.watchWritableBookies(versioned -> {
try {
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Failed to update rack info. ", e);
}
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new))
.thenAccept(this::updateRacksWithHost)
.exceptionally(ex -> {
LOG.error("Failed to update rack info. ", ex);
return null;
});
});
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.error("Failed watch available bookies.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {
MetadataStore store;
try {
store = BookieRackAffinityMapping.createMetadataStore(conf);
store = BookieRackAffinityMapping.getMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized");
}
Expand Down

0 comments on commit 4402ce5

Please sign in to comment.