diff --git a/README.md b/README.md index fd73728..0e1aa5e 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ sqsUrl|required, no default value|sqs queue url, like 'https://sqs.us-east-1.ama region|required, no default value|AWS region where queue is created fileFormat|required, no default value|file format for the s3 files stored on Amazon S3 schema|required, no default value|schema of the data being read -sqsFetchIntervalSeconds|10|time interval (in seconds) after which to fetch messages from Amazon SQS queue +sqsFetchIntervalMilliSeconds|10000|time interval (in milli seconds) after which to fetch messages from Amazon SQS queue sqsLongPollingWaitTimeSeconds|20|wait time (in seconds) for long polling on Amazon SQS queue sqsMaxConnections|1|number of parallel threads to connect to Amazon SQS queue sqsMaxRetries|10|Maximum number of consecutive retries in case of a connection failure to SQS before giving up @@ -71,7 +71,7 @@ An example to create a SQL stream which uses Amazon SQS to list files on S3, .option("sqsUrl", queueUrl) .option("region", awsRegion) .option("fileFormat", "json") - .option("sqsFetchIntervalSeconds", "2") + .option("sqsFetchIntervalMilliSeconds", "2000") .option("useInstanceProfileCredentials", "true") .option("sqsLongPollingWaitTimeSeconds", "5") .load() diff --git a/pom.xml b/pom.xml index 4da0697..e2af6a4 100644 --- a/pom.xml +++ b/pom.xml @@ -19,8 +19,8 @@ 4.0.0 com.qubole - spark-sql-streaming-sqs_2.11 - 0.5.2-SNAPSHOT + spark-sql-streaming-sqs_2.12 + 1.0.0-SNAPSHOT jar Spark SQL Streaming SQS Connector for faster reads from S3 using SQS @@ -49,7 +49,7 @@ scm:git:git://github.com/qubole/s3-sqs-connector.git https://github.com/qubole/s3-sqs-connector scm:git:git@github.com:qubole/s3-sqs-connector.git - spark-sql-streaming-sqs_2.11-0.5.1 + spark-sql-streaming-sqs_2.12-1.0.0 2020 @@ -60,9 +60,11 @@ spark-sql-streaming-sqs - 2.4.0 - 2.11 + 3.1.1 + 2.12 UTF-8 + 1.7 + 1.7 diff --git a/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala b/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala index b8c899d..c54dba1 100644 --- a/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala +++ b/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsClient.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.ThreadUtils class SqsClient(sourceOptions: SqsSourceOptions, hadoopConf: Configuration) extends Logging { - private val sqsFetchIntervalSeconds = sourceOptions.fetchIntervalSeconds + private val sqsFetchIntervalMilliSeconds = sourceOptions.fetchIntervalMilliSeconds private val sqsLongPollWaitTimeSeconds = sourceOptions.longPollWaitTimeSeconds private val sqsMaxRetries = sourceOptions.maxRetries private val maxConnections = sourceOptions.maxConnections @@ -81,8 +81,8 @@ class SqsClient(sourceOptions: SqsSourceOptions, sqsScheduler.scheduleWithFixedDelay( sqsFetchMessagesThread, 0, - sqsFetchIntervalSeconds, - TimeUnit.SECONDS) + sqsFetchIntervalMilliSeconds, + TimeUnit.MILLISECONDS) private def sqsFetchMessages(): Seq[(String, Long, String)] = { val messageList = try { @@ -192,7 +192,7 @@ class SqsClient(sourceOptions: SqsSourceOptions, s"${sqsMaxRetries} times Giving up. Check logs for details.")) } else { logWarning(s"Attempt ${retriesOnFailure}." + - s"Will reattempt after ${sqsFetchIntervalSeconds} seconds") + s"Will reattempt after ${sqsFetchIntervalMilliSeconds} seconds") } } diff --git a/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala b/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala index a4c0cc1..4623372 100644 --- a/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala +++ b/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSourceOptions.scala @@ -51,12 +51,12 @@ class SqsSourceOptions(parameters: CaseInsensitiveMap[String]) extends Logging { val maxFileAgeMs: Long = Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d")) - val fetchIntervalSeconds: Int = parameters.get("sqsFetchIntervalSeconds").map { str => + val fetchIntervalMilliSeconds: Int = parameters.get("sqsFetchIntervalMilliSeconds").map { str => Try(str.toInt).toOption.filter(_ > 0).getOrElse { throw new IllegalArgumentException( - s"Invalid value '$str' for option 'sqsFetchIntervalSeconds', must be a positive integer") + s"Invalid value '$str' for option 'sqsFetchIntervalMilliSeconds', must be a positive integer") } - }.getOrElse(10) + }.getOrElse(10000) val longPollWaitTimeSeconds: Int = parameters.get("sqsLongPollingWaitTimeSeconds").map { str => Try(str.toInt).toOption.filter(x => x >= 0 && x <= 20).getOrElse {