diff --git a/pom.xml b/pom.xml index 720c9db..385b1ac 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ - 2.10.2 + 3.3.2 2.12.17 2.12 3.2.14 @@ -153,6 +153,11 @@ commons-io ${commons-io.version} + + org.bouncycastle + bcprov-jdk18on + 1.72 + @@ -390,6 +395,7 @@ org.bouncycastle*:* org.lz4*:* commons-io:commons-io:jar:* + io.opentelemetry:* @@ -409,6 +415,10 @@ + + io.opentelemetry + org.apache.pulsar.shade.io.opentelemetry + com.google org.apache.pulsar.shade.com.google diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala index 9c77aec..39c1386 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala @@ -18,12 +18,8 @@ import java.util.Locale import scala.reflect._ -import org.apache.pulsar.client.impl.conf.{ - ClientConfigurationData, - ProducerConfigurationData, - ReaderConfigurationData -} -import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.annotation.JsonIgnore +import org.apache.pulsar.client.impl.conf.{ClientConfigurationData, ProducerConfigurationData, ReaderConfigurationData} object PulsarConfigurationUtils { diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala index 6135659..3ac6005 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala @@ -309,8 +309,10 @@ private[pulsar] case class PulsarHelper( private def getTopics(topicsPattern: String): Seq[String] = { val dest = TopicName.get(topicsPattern) val allTopics: ju.List[String] = client.getLookup - .getTopicsUnderNamespace(dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL) - .get() + .getTopicsUnderNamespace( + // passing an empty topicsHash because we don't cache the GetTopicsResponse + dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL, topicsPattern, "") + .get().getTopics val allNonPartitionedTopics: ju.List[String] = allTopics.asScala .filter(t => !TopicName.get(t).isPartitioned) @@ -345,7 +347,9 @@ private[pulsar] case class PulsarHelper( while (waitList.nonEmpty) { val topic = waitList.head try { - client.getPartitionedTopicMetadata(topic).get() + // setting metadataAutoCreationEnabled to false, and useFallbackForNonPIP344Brokers + // to true to conform to non-breaking behavior. + client.getPartitionedTopicMetadata(topic, false, true).get() waitList -= topic } catch { case NonFatal(_) => diff --git a/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala index 140fd5c..7b3bd2c 100644 --- a/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala +++ b/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala @@ -3,8 +3,9 @@ package org.apache.spark.sql.pulsar import java.{util => ju} import org.apache.pulsar.client.admin.PulsarAdmin +import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException import org.apache.pulsar.client.api.MessageId -import org.apache.pulsar.client.internal.DefaultImplementation + import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId} import org.apache.spark.sql.streaming.Trigger.{Once, ProcessingTime} import org.apache.spark.util.Utils @@ -48,10 +49,9 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest { // Need to call latestOffsetForTopicPartition so the helper instantiates // the admin val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, conf) - val e = intercept[RuntimeException] { + intercept[NotFoundException] { admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt) } - assert(e.getMessage.contains("Failed to load config into existing configuration data")) } test("Admit entry in the middle of the ledger") {