Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removing unnecessary lower limit on fetch interval #11

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ sqsUrl|required, no default value|sqs queue url, like 'https://sqs.us-east-1.ama
region|required, no default value|AWS region where queue is created
fileFormat|required, no default value|file format for the s3 files stored on Amazon S3
schema|required, no default value|schema of the data being read
sqsFetchIntervalSeconds|10|time interval (in seconds) after which to fetch messages from Amazon SQS queue
sqsFetchIntervalMilliSeconds|10000|time interval (in milli seconds) after which to fetch messages from Amazon SQS queue
sqsLongPollingWaitTimeSeconds|20|wait time (in seconds) for long polling on Amazon SQS queue
sqsMaxConnections|1|number of parallel threads to connect to Amazon SQS queue
sqsMaxRetries|10|Maximum number of consecutive retries in case of a connection failure to SQS before giving up
Expand All @@ -71,7 +71,7 @@ An example to create a SQL stream which uses Amazon SQS to list files on S3,
.option("sqsUrl", queueUrl)
.option("region", awsRegion)
.option("fileFormat", "json")
.option("sqsFetchIntervalSeconds", "2")
.option("sqsFetchIntervalMilliSeconds", "2000")
.option("useInstanceProfileCredentials", "true")
.option("sqsLongPollingWaitTimeSeconds", "5")
.load()
12 changes: 7 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
<modelVersion>4.0.0</modelVersion>

<groupId>com.qubole</groupId>
<artifactId>spark-sql-streaming-sqs_2.11</artifactId>
<version>0.5.2-SNAPSHOT</version>
<artifactId>spark-sql-streaming-sqs_2.12</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Spark SQL Streaming SQS</name>
<description>Connector for faster reads from S3 using SQS</description>
Expand Down Expand Up @@ -49,7 +49,7 @@
<connection>scm:git:git://github.com/qubole/s3-sqs-connector.git</connection>
<url>https://github.com/qubole/s3-sqs-connector</url>
<developerConnection>scm:git:[email protected]:qubole/s3-sqs-connector.git</developerConnection>
<tag>spark-sql-streaming-sqs_2.11-0.5.1</tag>
<tag>spark-sql-streaming-sqs_2.12-1.0.0</tag>
</scm>

<inceptionYear>2020</inceptionYear>
Expand All @@ -60,9 +60,11 @@

<properties>
<sbt.project.name>spark-sql-streaming-sqs</sbt.project.name>
<spark.version>2.4.0</spark.version>
<scala.binary.version>2.11</scala.binary.version>
<spark.version>3.1.1</spark.version>
<scala.binary.version>2.12</scala.binary.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.util.ThreadUtils
class SqsClient(sourceOptions: SqsSourceOptions,
hadoopConf: Configuration) extends Logging {

private val sqsFetchIntervalSeconds = sourceOptions.fetchIntervalSeconds
private val sqsFetchIntervalMilliSeconds = sourceOptions.fetchIntervalMilliSeconds
private val sqsLongPollWaitTimeSeconds = sourceOptions.longPollWaitTimeSeconds
private val sqsMaxRetries = sourceOptions.maxRetries
private val maxConnections = sourceOptions.maxConnections
Expand Down Expand Up @@ -81,8 +81,8 @@ class SqsClient(sourceOptions: SqsSourceOptions,
sqsScheduler.scheduleWithFixedDelay(
sqsFetchMessagesThread,
0,
sqsFetchIntervalSeconds,
TimeUnit.SECONDS)
sqsFetchIntervalMilliSeconds,
TimeUnit.MILLISECONDS)

private def sqsFetchMessages(): Seq[(String, Long, String)] = {
val messageList = try {
Expand Down Expand Up @@ -192,7 +192,7 @@ class SqsClient(sourceOptions: SqsSourceOptions,
s"${sqsMaxRetries} times Giving up. Check logs for details."))
} else {
logWarning(s"Attempt ${retriesOnFailure}." +
s"Will reattempt after ${sqsFetchIntervalSeconds} seconds")
s"Will reattempt after ${sqsFetchIntervalMilliSeconds} seconds")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging {
val maxFileAgeMs: Long =
Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d"))

val fetchIntervalSeconds: Int = parameters.get("sqsFetchIntervalSeconds").map { str =>
val fetchIntervalMilliSeconds: Int = parameters.get("sqsFetchIntervalMilliSeconds").map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'sqsFetchIntervalSeconds', must be a positive integer")
s"Invalid value '$str' for option 'sqsFetchIntervalMilliSeconds', must be a positive integer")
}
}.getOrElse(10)
}.getOrElse(10000)

val longPollWaitTimeSeconds: Int = parameters.get("sqsLongPollingWaitTimeSeconds").map { str =>
Try(str.toInt).toOption.filter(x => x >= 0 && x <= 20).getOrElse {
Expand Down