From 4402ce55be9631f00b7976b444e993949e7286a8 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 5 Jun 2024 10:49:00 -0700 Subject: [PATCH] [fix] Remove blocking calls from BookieRackAffinityMapping (#22846) (cherry picked from commit aece67e35ecec4a9d90a951b78cfc89ca6395054) --- .../BookieRackAffinityMapping.java | 44 ++++++++++++------- ...IsolatedBookieEnsemblePlacementPolicy.java | 2 +- 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index b839b41112cd9..0ba4f5080e606 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -70,7 +70,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration(); private Map bookieInfoMap = new HashMap<>(); - public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException { + static MetadataStore getMetadataStore(Configuration conf) throws MetadataException { MetadataStore store; Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE); if (storeProperty != null) { @@ -115,12 +115,20 @@ public synchronized void setConf(Configuration conf) { super.setConf(conf); MetadataStore store; try { - store = createMetadataStore(conf); - bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - store.registerListener(this::handleUpdates); - racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() - .orElseGet(BookiesRackConfiguration::new); - for (Map bookieMapping : racksWithHost.values()) { + store = getMetadataStore(conf); + } catch (MetadataException e) { + throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); + } + + bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); + store.registerListener(this::handleUpdates); + + try { + var racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) + .thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) + .get(); + + for (var bookieMapping : racksWithHost.values()) { for (String address : bookieMapping.keySet()) { bookieAddressListLastTime.add(BookieId.parse(address)); } @@ -130,10 +138,12 @@ public synchronized void setConf(Configuration conf) { } } updateRacksWithHost(racksWithHost); - watchAvailableBookies(); - } catch (InterruptedException | ExecutionException | MetadataException e) { - throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list"); + } catch (ExecutionException | InterruptedException e) { + LOG.error("Failed to update rack info. ", e); + throw new RuntimeException(e); } + + watchAvailableBookies(); } private void watchAvailableBookies() { @@ -144,13 +154,13 @@ private void watchAvailableBookies() { field.setAccessible(true); RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver); registrationClient.watchWritableBookies(versioned -> { - try { - racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get() - .orElseGet(BookiesRackConfiguration::new); - updateRacksWithHost(racksWithHost); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to update rack info. ", e); - } + bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH) + .thenApply(optRes -> optRes.orElseGet(BookiesRackConfiguration::new)) + .thenAccept(this::updateRacksWithHost) + .exceptionally(ex -> { + LOG.error("Failed to update rack info. ", ex); + return null; + }); }); } catch (NoSuchFieldException | IllegalAccessException e) { LOG.error("Failed watch available bookies.", e); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index 64bc59057d0f8..15b4d15762e4e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -71,7 +71,7 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) { MetadataStore store; try { - store = BookieRackAffinityMapping.createMetadataStore(conf); + store = BookieRackAffinityMapping.getMetadataStore(conf); } catch (MetadataException e) { throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized"); }