Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrading Pulsar Version to 3.3.2 #180

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@

<!-- dependencies -->
<!-- latest version from apache pulsar -->
<pulsar.version>2.10.2</pulsar.version>
<pulsar.version>3.3.2</pulsar.version>
<scala.version>2.12.17</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<scalatest.version>3.2.14</scalatest.version>
Expand Down Expand Up @@ -153,6 +153,11 @@
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.72</version>
</dependency>

<!-- spark dependency -->

Expand Down Expand Up @@ -390,6 +395,7 @@
<include>org.bouncycastle*:*</include>
<include>org.lz4*:*</include>
<include>commons-io:commons-io:jar:*</include>
<include>io.opentelemetry:*</include> <!-- Add this -->
</includes>
</artifactSet>
<filters>
Expand All @@ -409,6 +415,10 @@
</filter>
</filters>
<relocations>
<relocation>
<pattern>io.opentelemetry</pattern>
<shadedPattern>org.apache.pulsar.shade.io.opentelemetry</shadedPattern>
</relocation>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ import java.util.Locale

import scala.reflect._

import org.apache.pulsar.client.impl.conf.{
ClientConfigurationData,
ProducerConfigurationData,
ReaderConfigurationData
}
import org.apache.pulsar.shade.com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.pulsar.client.impl.conf.{ClientConfigurationData, ProducerConfigurationData, ReaderConfigurationData}

object PulsarConfigurationUtils {

Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,10 @@ private[pulsar] case class PulsarHelper(
private def getTopics(topicsPattern: String): Seq[String] = {
val dest = TopicName.get(topicsPattern)
val allTopics: ju.List[String] = client.getLookup
.getTopicsUnderNamespace(dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL)
.get()
.getTopicsUnderNamespace(
// passing an empty topicsHash because we don't cache the GetTopicsResponse
dest.getNamespaceObject, CommandGetTopicsOfNamespace.Mode.ALL, topicsPattern, "")
.get().getTopics

val allNonPartitionedTopics: ju.List[String] = allTopics.asScala
.filter(t => !TopicName.get(t).isPartitioned)
Expand Down Expand Up @@ -345,7 +347,9 @@ private[pulsar] case class PulsarHelper(
while (waitList.nonEmpty) {
val topic = waitList.head
try {
client.getPartitionedTopicMetadata(topic).get()
// setting metadataAutoCreationEnabled to false, and useFallbackForNonPIP344Brokers
// to true to conform to non-breaking behavior.
client.getPartitionedTopicMetadata(topic, false, true).get()
waitList -= topic
} catch {
case NonFatal(_) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package org.apache.spark.sql.pulsar
import java.{util => ju}

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.internal.DefaultImplementation

import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
import org.apache.spark.sql.streaming.Trigger.{Once, ProcessingTime}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -48,10 +49,9 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
// Need to call latestOffsetForTopicPartition so the helper instantiates
// the admin
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, conf)
val e = intercept[RuntimeException] {
intercept[NotFoundException] {
admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt)
}
assert(e.getMessage.contains("Failed to load config into existing configuration data"))
}

test("Admit entry in the middle of the ledger") {
Expand Down
Loading