From e43e8a41b1ce65af11613781b9dfe201fa045685 Mon Sep 17 00:00:00 2001
From: Eric Marnadi <132308037+ericm-db@users.noreply.github.com>
Date: Thu, 7 Nov 2024 16:07:24 -0800
Subject: [PATCH] Upgrading Pulsar Version to 3.3.2 (#180)
* init
* adding jsonignore filter back
* updating exception
* adding bouncycastle ack
* adding back lines
---
pom.xml | 12 +++++++++++-
.../spark/sql/pulsar/PulsarConfigurationUtils.scala | 8 ++------
.../org/apache/spark/sql/pulsar/PulsarHelper.scala | 10 +++++++---
.../sql/pulsar/PulsarAdmissionControlSuite.scala | 6 +++---
4 files changed, 23 insertions(+), 13 deletions(-)
diff --git a/pom.xml b/pom.xml
index 720c9db0..385b1ac8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,7 +66,7 @@
- 2.10.2
+ 3.3.2
2.12.17
2.12
3.2.14
@@ -153,6 +153,11 @@
commons-io
${commons-io.version}
+
+ org.bouncycastle
+ bcprov-jdk18on
+ 1.72
+
@@ -390,6 +395,7 @@
org.bouncycastle*:*
org.lz4*:*
commons-io:commons-io:jar:*
+ io.opentelemetry:*
@@ -409,6 +415,10 @@
+
+ io.opentelemetry
+ org.apache.pulsar.shade.io.opentelemetry
+
com.google
org.apache.pulsar.shade.com.google
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala
index 9c77aec8..39c1386e 100644
--- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala
+++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarConfigurationUtils.scala
@@ -18,12 +18,8 @@ import java.util.Locale
import scala.reflect._
-import org.apache.pulsar.client.impl.conf.{
- ClientConfigurationData,
- ProducerConfigurationData,
- ReaderConfigurationData
-}
-import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonIgnore
+import com.fasterxml.jackson.annotation.JsonIgnore
+import org.apache.pulsar.client.impl.conf.{ClientConfigurationData, ProducerConfigurationData, ReaderConfigurationData}
object PulsarConfigurationUtils {
diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
index 61356590..3ac60050 100644
--- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
+++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
@@ -309,8 +309,10 @@ private[pulsar] case class PulsarHelper(
private def getTopics(topicsPattern: String): Seq[String] = {
val dest = TopicName.get(topicsPattern)
val allTopics: ju.List[String] = client.getLookup
- .getTopicsUnderNamespace(dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL)
- .get()
+ .getTopicsUnderNamespace(
+ // passing an empty topicsHash because we don't cache the GetTopicsResponse
+ dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL, topicsPattern, "")
+ .get().getTopics
val allNonPartitionedTopics: ju.List[String] = allTopics.asScala
.filter(t => !TopicName.get(t).isPartitioned)
@@ -345,7 +347,9 @@ private[pulsar] case class PulsarHelper(
while (waitList.nonEmpty) {
val topic = waitList.head
try {
- client.getPartitionedTopicMetadata(topic).get()
+ // setting metadataAutoCreationEnabled to false, and useFallbackForNonPIP344Brokers
+ // to true to conform to non-breaking behavior.
+ client.getPartitionedTopicMetadata(topic, false, true).get()
waitList -= topic
} catch {
case NonFatal(_) =>
diff --git a/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala
index 140fd5c1..7b3bd2ce 100644
--- a/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala
+++ b/src/test/scala/org/apache/spark/sql/pulsar/PulsarAdmissionControlSuite.scala
@@ -3,8 +3,9 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}
import org.apache.pulsar.client.admin.PulsarAdmin
+import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
import org.apache.pulsar.client.api.MessageId
-import org.apache.pulsar.client.internal.DefaultImplementation
+
import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
import org.apache.spark.sql.streaming.Trigger.{Once, ProcessingTime}
import org.apache.spark.util.Utils
@@ -48,10 +49,9 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
// Need to call latestOffsetForTopicPartition so the helper instantiates
// the admin
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, conf)
- val e = intercept[RuntimeException] {
+ intercept[NotFoundException] {
admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt)
}
- assert(e.getMessage.contains("Failed to load config into existing configuration data"))
}
test("Admit entry in the middle of the ledger") {