Skip to content

Commit

Permalink
[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data.

As per suggestion in KAFKA-7703, we put a position call between poll and seekToEnd to block the fetch request triggered by `poll` before calling `seekToEnd`.

In addition, to avoid other unknown issues, we also use the previous known offsets to audit the latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times.

## How was this patch tested?

Jenkins

Closes apache#23324 from zsxwing/SPARK-26267.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
zsxwing committed Dec 21, 2018
1 parent 305e9b5 commit 8e76d66
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
override def initialOffset(): Offset = {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
logInfo(s"Initial offsets: $offsets")
Expand Down Expand Up @@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(

override def needsReconfiguration(config: ScanConfig): Boolean = {
val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
offsetReader.fetchLatestOffsets().keySet != knownPartitions
offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
}

override def toString(): String = s"KafkaSource[$offsetReader]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(

override def latestOffset(start: Offset): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
}.getOrElse {
Expand Down Expand Up @@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))

val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
val untilOffsets = endPartitionOffsets
untilOffsets.foreach { case (tp, untilOffset) =>
fromOffsets.get(tp).foreach { fromOffset =>
if (untilOffset < fromOffset) {
reportDataLoss(s"Partition $tp's offset was changed from " +
s"$fromOffset to $untilOffset, some data may have been missed")
}
}
}

// Calculate offset ranges
val offsetRanges = rangeCalculator.getRanges(
fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
untilOffsets = endPartitionOffsets,
fromOffsets = fromOffsets,
untilOffsets = untilOffsets,
executorLocations = getSortedExecutorList())

// Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions,
Expand Down Expand Up @@ -186,7 +197,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
case EarliestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
* the read tasks of the skewed partitions to multiple Spark tasks.
* The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more
* depending on rounding errors or Kafka partitions that didn't receive any new data.
*
* Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
*/
def getRanges(
fromOffsets: PartitionOffsetMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.{util => ju}
import java.util.concurrent.{Executors, ThreadFactory}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
Expand Down Expand Up @@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader(
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()

// Call `position` to wait until the potential offset request triggered by `poll(0)` is
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
// `poll(0)` may reset offsets that should have been set by another request.
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})

consumer.pause(partitions)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
Expand Down Expand Up @@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader(
/**
* Fetch the latest offsets for the topic partitions that are indicated
* in the [[ConsumerStrategy]].
*
* Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called
* right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after
* `poll` to wait until the potential offset request triggered by `poll(0)` is done.
*
* In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the
* latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less
* than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When
* a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot
* distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying.
*/
def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
def fetchLatestOffsets(
knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly {
withRetriesWithoutInterrupt {
// Poll to get the latest assigned partitions
consumer.poll(0)
val partitions = consumer.assignment()

// Call `position` to wait until the potential offset request triggered by `poll(0)` is
// done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by
// `poll(0)` may reset offsets that should have been set by another request.
partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {})

consumer.pause(partitions)
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.")

consumer.seekToEnd(partitions)
val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
if (knownOffsets.isEmpty) {
consumer.seekToEnd(partitions)
partitions.asScala.map(p => p -> consumer.position(p)).toMap
} else {
var partitionOffsets: PartitionOffsetMap = Map.empty

/**
* Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect
* latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`).
*/
def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = {
var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
partitionOffsets.foreach { case (tp, offset) =>
knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
if (knownOffset > offset) {
val incorrectOffset = (tp, knownOffset, offset)
incorrectOffsets += incorrectOffset
}
})
}
incorrectOffsets
}

// Retry to fetch latest offsets when detecting incorrect offsets. We don't use
// `withRetriesWithoutInterrupt` to retry because:
//
// - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh
// consumer has a much bigger chance to hit KAFKA-7703.
// - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
var attempt = 0
do {
consumer.seekToEnd(partitions)
partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap
attempt += 1

incorrectOffsets = findIncorrectOffsets()
if (incorrectOffsets.nonEmpty) {
logWarning("Found incorrect offsets in some partitions " +
s"(partition, previous offset, fetched offset): $incorrectOffsets")
if (attempt < maxOffsetFetchAttempts) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
}
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)

logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
}
metadataLog.add(0, offsets)
Expand All @@ -148,7 +148,8 @@ private[kafka010] class KafkaSource(
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets

val latest = kafkaReader.fetchLatestOffsets()
val latest = kafkaReader.fetchLatestOffsets(
currentPartitionOffsets.orElse(Some(initialPartitionOffsets)))
val offsets = maxOffsetsPerTrigger match {
case None =>
latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
)
}

test("subscribe topic by pattern with topic recreation between batches") {
val topicPrefix = newTopic()
val topic = topicPrefix + "-good"
val topic2 = topicPrefix + "-bad"
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, Array("1", "3"))
testUtils.createTopic(topic2, partitions = 1)
testUtils.sendMessages(topic2, Array("2", "4"))

val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("kafka.default.api.timeout.ms", "3000")
.option("startingOffsets", "earliest")
.option("subscribePattern", s"$topicPrefix-.*")

val ds = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.map(kv => kv._2.toInt)

testStream(ds)(
StartStream(),
AssertOnQuery { q =>
q.processAllAvailable()
true
},
CheckAnswer(1, 2, 3, 4),
// Restart the stream in this test to make the test stable. When recreating a topic when a
// consumer is alive, it may not be able to see the recreated topic even if a fresh consumer
// has seen it.
StopStream,
// Recreate `topic2` and wait until it's available
WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () =>
testUtils.deleteTopic(topic2)
testUtils.createTopic(topic2)
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
ExpectFailure[IllegalStateException](e => {
// The offset of `topic2` should be changed from 2 to 1
assert(e.getMessage.contains("was changed from 2 to 1"))
})
)
}

test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") {
withTempDir { metadataPath =>
val topic = "kafka-initial-offset-current"
Expand Down

0 comments on commit 8e76d66

Please sign in to comment.