diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java index 2235b9a7128b8..d7c0d0adb3afc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; import java.io.IOException; +import java.util.concurrent.CompletionException; public class PulsarServerException extends IOException { private static final long serialVersionUID = 1; @@ -44,4 +45,20 @@ public NotFoundException(Throwable t) { super(t); } } + + public static PulsarServerException from(Throwable throwable) { + if (throwable instanceof CompletionException) { + return from(throwable.getCause()); + } + if (throwable instanceof PulsarServerException pulsarServerException) { + return pulsarServerException; + } else { + return new PulsarServerException(throwable); + } + } + + // Wrap this checked exception into a specific unchecked exception + public static CompletionException toUncheckedException(PulsarServerException e) { + return new CompletionException(e); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index dd66fc957d8c7..334eabeacea62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -960,7 +960,7 @@ public void start() throws PulsarServerException { state = State.Started; } catch (Exception e) { LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e); - PulsarServerException startException = new PulsarServerException(e); + PulsarServerException startException = PulsarServerException.from(e); readyForIncomingRequestsFuture.completeExceptionally(startException); throw startException; } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index e0fd738a408e2..af0ea0ea224d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -80,7 +80,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; -import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; @@ -97,10 +96,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -123,10 +119,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; - public static final int STARTUP_TIMEOUT_SECONDS = 30; - - public static final int MAX_RETRY = 5; - private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; public static final Set INTERNAL_TOPICS = @@ -204,7 +196,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private final ConcurrentHashMap>> lookupRequests = new ConcurrentHashMap<>(); - private final CompletableFuture initWaiter = new CompletableFuture<>(); + private final CompletableFuture initWaiter = new CompletableFuture<>(); /** * Get all the bundles that are owned by this broker. @@ -331,7 +323,7 @@ public void start() throws PulsarServerException { return; } try { - this.brokerRegistry = new BrokerRegistryImpl(pulsar); + this.brokerRegistry = createBrokerRegistry(pulsar); this.leaderElectionService = new LeaderElectionService( pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, @@ -346,53 +338,14 @@ public void start() throws PulsarServerException { }); }); }); - this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar); + this.serviceUnitStateChannel = createServiceUnitStateChannel(pulsar); this.brokerRegistry.start(); this.splitManager = new SplitManager(splitCounter); this.unloadManager = new UnloadManager(unloadCounter); this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); - pulsar.runWhenReadyForIncomingRequests(() -> { - Backoff backoff = new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS) - .create(); - int retry = 0; - while (!Thread.currentThread().isInterrupted()) { - try { - brokerRegistry.register(); - this.serviceUnitStateChannel.start(); - break; - } catch (Exception e) { - log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", - pulsar.getBrokerId(), ++retry, e); - try { - Thread.sleep(backoff.next()); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - // preserve thread's interrupt status - Thread.currentThread().interrupt(); - try { - pulsar.close(); - } catch (PulsarServerException exc) { - log.error("Failed to close pulsar service.", exc); - } - return; - } - failStarting(e); - if (retry >= MAX_RETRY) { - log.error("Failed to start the service unit state channel after retry {} th. " - + "Closing pulsar service.", retry, e); - try { - pulsar.close(); - } catch (PulsarServerException ex) { - log.error("Failed to close pulsar service.", ex); - } - } - } - } - }); + this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); @@ -401,15 +354,10 @@ public void start() throws PulsarServerException { SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); - - try { - this.brokerLoadDataStore = LoadDataStoreFactory - .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); - this.topBundlesLoadDataStore = LoadDataStoreFactory - .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); - } catch (LoadDataStoreException e) { - throw new PulsarServerException(e); - } + this.brokerLoadDataStore = LoadDataStoreFactory + .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); + this.topBundlesLoadDataStore = LoadDataStoreFactory + .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); this.context = LoadManagerContextImpl.builder() .configuration(conf) @@ -433,6 +381,7 @@ public void start() throws PulsarServerException { pulsar.runWhenReadyForIncomingRequests(() -> { try { + this.serviceUnitStateChannel.start(); var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() @@ -467,38 +416,33 @@ public void start() throws PulsarServerException { MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); this.splitScheduler.start(); - this.initWaiter.complete(null); + this.initWaiter.complete(true); this.started = true; log.info("Started load manager."); - } catch (Exception ex) { - failStarting(ex); + } catch (Throwable e) { + failStarting(e); } }); - } catch (Exception ex) { + } catch (Throwable ex) { failStarting(ex); } } - private void failStarting(Exception ex) { - log.error("Failed to start the extensible load balance and close broker registry {}.", - this.brokerRegistry, ex); + private void failStarting(Throwable throwable) { if (this.brokerRegistry != null) { try { - brokerRegistry.unregister(); - } catch (MetadataStoreException e) { - // ignore - } - } - if (this.serviceUnitStateChannel != null) { - try { - serviceUnitStateChannel.close(); - } catch (IOException e) { - // ignore + brokerRegistry.close(); + } catch (PulsarServerException e) { + // If close failed, this broker might still exist in the metadata store. Then it could be found by other + // brokers as an available broker. Hence, print a warning log for it. + log.warn("Failed to close the broker registry: {}", e.getMessage()); } } - initWaiter.completeExceptionally(ex); + initWaiter.complete(false); // exit the background thread gracefully + throw PulsarServerException.toUncheckedException(PulsarServerException.from(throwable)); } + @Override public void initialize(PulsarService pulsar) { this.pulsar = pulsar; @@ -843,7 +787,9 @@ synchronized void playLeader() { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; @@ -893,7 +839,9 @@ synchronized void playFollower() { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; @@ -957,7 +905,9 @@ public List getMetrics() { @VisibleForTesting protected void monitor() { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } // Monitor role // Periodically check the role in case ZK watcher fails. @@ -1012,4 +962,14 @@ private void closeInternalTopics() { log.warn("Failed to wait for closing internal topics", e); } } + + @VisibleForTesting + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + return new BrokerRegistryImpl(pulsar); + } + + @VisibleForTesting + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + return new ServiceUnitStateChannelImpl(pulsar); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java new file mode 100644 index 0000000000000..a400bf733e557 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.common.util.PortManager; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class LoadManagerFailFastTest { + + private static final String cluster = "test"; + private final int zkPort = PortManager.nextLockedFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private final ServiceConfiguration config = new ServiceConfiguration(); + + @BeforeClass + protected void setup() throws Exception { + bk.start(); + config.setClusterName(cluster); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:localhost:" + zkPort); + } + + @AfterClass + protected void cleanup() throws Exception { + bk.stop(); + } + + @Test(timeOut = 30000) + public void testBrokerRegistryFailure() throws Exception { + config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry"); + } + Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get() + .isEmpty()); + } + + @Test(timeOut = 30000) + public void testServiceUnitStateChannelFailure() throws Exception { + config.setLoadManagerClassName(ChannelLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start ServiceUnitStateChannel"); + } + Awaitility.await().untilAsserted(() -> Assert.assertTrue(pulsar.getLocalMetadataStore() + .getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty())); + } + + private static class BrokerRegistryLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + final var mockBrokerRegistry = Mockito.mock(BrokerRegistryImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start BrokerRegistry")).when(mockBrokerRegistry) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + return mockBrokerRegistry; + } + } + + private static class ChannelLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + final var channel = Mockito.mock(ServiceUnitStateChannelImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start ServiceUnitStateChannel")).when(channel) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any()); + return channel; + } + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java index 1e34c3e4fe706..e08d1f0241b55 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java @@ -32,7 +32,9 @@ import java.io.RandomAccessFile; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.StandardCopyOption; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.Base64; @@ -86,19 +88,32 @@ static File doUnpackNar(final File nar, final File baseWorkingDirectory, Runnabl try (FileChannel channel = new RandomAccessFile(lockFile, "rw").getChannel(); FileLock lock = channel.lock()) { File narWorkingDirectory = new File(parentDirectory, md5Sum); - if (narWorkingDirectory.mkdir()) { + if (!narWorkingDirectory.exists()) { + File narExtractionTempDirectory = new File(parentDirectory, md5Sum + ".tmp"); + if (narExtractionTempDirectory.exists()) { + FileUtils.deleteFile(narExtractionTempDirectory, true); + } + if (!narExtractionTempDirectory.mkdir()) { + throw new IOException("Cannot create " + narExtractionTempDirectory); + } try { - log.info("Extracting {} to {}", nar, narWorkingDirectory); + log.info("Extracting {} to {}", nar, narExtractionTempDirectory); if (extractCallback != null) { extractCallback.run(); } - unpack(nar, narWorkingDirectory); + unpack(nar, narExtractionTempDirectory); } catch (IOException e) { log.error("There was a problem extracting the nar file. Deleting {} to clean up state.", - narWorkingDirectory, e); - FileUtils.deleteFile(narWorkingDirectory, true); + narExtractionTempDirectory, e); + try { + FileUtils.deleteFile(narExtractionTempDirectory, true); + } catch (IOException e2) { + log.error("Failed to delete temporary directory {}", narExtractionTempDirectory, e2); + } throw e; } + Files.move(narExtractionTempDirectory.toPath(), narWorkingDirectory.toPath(), + StandardCopyOption.ATOMIC_MOVE); } return narWorkingDirectory; } @@ -166,7 +181,7 @@ private static void makeFile(final InputStream inputStream, final File file) thr * @throws IOException * if cannot read file */ - private static byte[] calculateMd5sum(final File file) throws IOException { + protected static byte[] calculateMd5sum(final File file) throws IOException { try (final FileInputStream inputStream = new FileInputStream(file)) { final MessageDigest md5 = MessageDigest.getInstance("md5"); @@ -183,4 +198,4 @@ private static byte[] calculateMd5sum(final File file) throws IOException { throw new IllegalArgumentException(nsae); } } -} +} \ No newline at end of file diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java index a1f915c8b7828..1c3a2c276537b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java @@ -118,6 +118,17 @@ public static void main(String[] args) { } } + @Test + void shouldReExtractWhenUnpackedDirectoryIsMissing() throws IOException { + AtomicInteger extractCounter = new AtomicInteger(); + + File narWorkingDirectory = NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet); + FileUtils.deleteFile(narWorkingDirectory, true); + NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, extractCounter::incrementAndGet); + + assertEquals(extractCounter.get(), 2); + } + @Test void shouldExtractFilesOnceInDifferentProcess() throws InterruptedException { int processes = 5; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java index 8e744e3b1229c..c396fbb85e24e 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java @@ -64,6 +64,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import org.apache.commons.lang3.tuple.Pair; /** @@ -276,6 +277,10 @@ public static double getDouble(JsonNode value, Type type, String columnName) { private static Slice getSlice(JsonNode value, Type type, String columnName) { String textValue = value.isValueNode() ? value.asText() : value.toString(); + if (type instanceof UuidType) { + return UuidType.javaUuidToTrinoUuid(UUID.fromString(textValue)); + } + Slice slice = utf8Slice(textValue); if (type instanceof VarcharType) { slice = truncateToLength(slice, type); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java index 32e71a53444cf..a5207d0012457 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java @@ -144,7 +144,7 @@ public void testPrimitiveType() { PulsarColumnHandle uuidHandle = new PulsarColumnHandle(getPulsarConnectorId().toString(), "uuidField", UuidType.UUID, false, false, "uuidField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE); - checkValue(decodedRow, uuidHandle, message.uuidField.toString()); + checkValue(decodedRow, uuidHandle, UuidType.javaUuidToTrinoUuid(message.uuidField)); } @Test diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java index 357fd8724d738..61acdae37696b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMsSqlContainer.java @@ -37,7 +37,7 @@ public class DebeziumMsSqlContainer extends ChaosContainer