From 9f87cd8106c5b4992e61e8d5b18e4cbb2a9dc087 Mon Sep 17 00:00:00 2001 From: Aishwarya Rajendra Murkute <44307949+Aishwarya2203@users.noreply.github.com> Date: Sat, 20 Apr 2024 02:44:36 -0700 Subject: [PATCH] Bug Fix to filter uncommitted messages when sorting is disabled and maxFilesPerTrigger is set Bug/Issue : Incorrect filter condition, leading to no files being given to Spark When working with the connector code, we had to disable the sorting to gain performance improvement in our application. After disabling sorting, we noticed that the condition to filter the uncommitted messages from the cache is incorrect and will always be false. Since we want to filter uncommitted messages, the condition should be looking for files that are not committed, similar to the code flow path here: https://github.com/qubole/s3-sqs-connector/blob/master/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala#L118 --- .../scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala b/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala index ff42448..00f47b0 100644 --- a/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala +++ b/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsFileCache.scala @@ -95,7 +95,7 @@ class SqsFileCache(maxAgeMs: Long, fileNameOnly: Boolean) extends Logging { val uncommittedFiles = ListBuffer[(String, Long, String)]() while (uncommittedFiles.length < maxFilesPerTrigger && iterator.hasNext) { val file = iterator.next() - if (file._2.isCommitted && file._2.timestamp >= lastPurgeTimestamp) { + if (!file._2.isCommitted && file._2.timestamp >= lastPurgeTimestamp) { uncommittedFiles += ((file._1, file._2.timestamp, file._2.messageReceiptHandle)) } }