Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dead letter queues #23

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,85 @@
package com.commercetools.queue.aws.sqs

import cats.effect.Async
import cats.syntax.applicativeError._
import cats.syntax.flatMap._
import cats.syntax.functor._
import cats.syntax.functorFilter._
import cats.syntax.monadError._
import cats.syntax.option._
import cats.syntax.all._
import com.commercetools.queue.aws.sqs.makeQueueException
import com.commercetools.queue.{MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueDoesNotExistException}
import com.commercetools.queue.{DeadletterQueueConfiguration, MalformedQueueConfigurationException, QueueAdministration, QueueConfiguration, QueueCreationConfiguration, QueueDoesNotExistException}
import software.amazon.awssdk.protocols.jsoncore.JsonNodeParser
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.{CreateQueueRequest, DeleteQueueRequest, GetQueueAttributesRequest, QueueAttributeName, SetQueueAttributesRequest}

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[String])(implicit F: Async[F])
class SQSAdministration[F[_]](
client: SqsAsyncClient,
getQueueUrl: String => F[String],
makeDQLName: String => String
)(implicit F: Async[F])
extends QueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
/**
* In SQS, a dead letter queue is a standard queue with a `RedriveAllowPolicy` attribute.
* Its ARN will be referenced in the attributes of the source queue, so it is returned
* after creation.
*/
private def createDeadLetterQueue(baseName: String): F[String] = {
val dlqName = makeDQLName(baseName)
F.fromCompletableFuture {
F.delay {
client.createQueue(
CreateQueueRequest
.builder()
.queueName(name)
.queueName(dlqName)
.attributes(Map(
QueueAttributeName.MESSAGE_RETENTION_PERIOD -> messageTTL.toSeconds.toString(),
QueueAttributeName.VISIBILITY_TIMEOUT -> lockTTL.toSeconds.toString()).asJava)
QueueAttributeName.REDRIVE_ALLOW_POLICY -> """{"redrivePermission": "allowAll"}""",
// SQS has a maximum retention period of 14 days
// see https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_CreateQueue.html
QueueAttributeName.MESSAGE_RETENTION_PERIOD -> 14.days.toSeconds.toString()
).asJava)
.build())
}
}.void
.adaptError(makeQueueException(_, name))
}.flatMap { response =>
F.fromCompletableFuture {
F.delay {
client.getQueueAttributes(
GetQueueAttributesRequest
.builder()
.queueUrl(response.queueUrl())
.attributeNames(QueueAttributeName.QUEUE_ARN)
.build())
}
}.flatMap { response =>
val arn = response.attributes().get(QueueAttributeName.QUEUE_ARN)
F.raiseWhen(arn == null)(
MalformedQueueConfigurationException(dlqName, QueueAttributeName.QUEUE_ARN.toString(), "<missing>"))
.as(arn)
}
}
}

override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] =
configuration.deadletter
.traverse(maxAttempts => createDeadLetterQueue(name).map(_ -> maxAttempts))
.flatMap { dlq =>
F.fromCompletableFuture {
F.delay {
client.createQueue(
CreateQueueRequest
.builder()
.queueName(name)
.attributes(Map(
QueueAttributeName.MESSAGE_RETENTION_PERIOD -> Some(configuration.messageTTL.toSeconds.toString()),
QueueAttributeName.VISIBILITY_TIMEOUT -> Some(configuration.lockTTL.toSeconds.toString()),
QueueAttributeName.REDRIVE_POLICY -> dlq.map { case (dlqArn, maxAttempts) =>
s"""{"deadLetterTargetArn":"$dlqArn","maxReceiveCount":$maxAttempts}"""
}
).flattenOption.asJava)
.build())
}
}.void
.adaptError(makeQueueException(_, name))
}

override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] =
getQueueUrl(name)
Expand Down Expand Up @@ -77,7 +125,10 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S
GetQueueAttributesRequest
.builder()
.queueUrl(queueUrl)
.attributeNames(QueueAttributeName.MESSAGE_RETENTION_PERIOD, QueueAttributeName.VISIBILITY_TIMEOUT)
.attributeNames(
QueueAttributeName.MESSAGE_RETENTION_PERIOD,
QueueAttributeName.VISIBILITY_TIMEOUT,
QueueAttributeName.REDRIVE_POLICY)
.build())
}
}
Expand All @@ -101,25 +152,41 @@ class SQSAdministration[F[_]](client: SqsAsyncClient, getQueueUrl: String => F[S
ttl.toIntOption
.map(_.seconds)
.liftTo[F](MalformedQueueConfigurationException(name, "lockTTL", ttl)))
} yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL)
deadletter <- attributes.get(QueueAttributeName.REDRIVE_POLICY).traverse { policy =>
for {
bag <- F.delay(JsonNodeParser.create().parse(policy))
dlq <- F.delay(bag.field("deadLetterTargetArn").get().asString().split(":").last)
maxAttempts <- F.delay(bag.field("maxReceiveCount").get().asNumber().toInt)
} yield DeadletterQueueConfiguration(dlq, maxAttempts)
}
} yield QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter)
}
.adaptError(makeQueueException(_, name))

override def delete(name: String): F[Unit] =
getQueueUrl(name)
.flatMap { queueUrl =>
F.fromCompletableFuture {
F.delay {
client.deleteQueue(
DeleteQueueRequest
.builder()
.queueUrl(queueUrl)
.build())
override def delete(name: String): F[Unit] = {
def doDelete(name: String): F[Unit] =
getQueueUrl(name)
.flatMap { queueUrl =>
F.fromCompletableFuture {
F.delay {
client.deleteQueue(
DeleteQueueRequest
.builder()
.queueUrl(queueUrl)
.build())
}
}
}
}
.void
.adaptError(makeQueueException(_, name))
.void
.adaptError(makeQueueException(_, name))

configuration(name).flatMap { queueConfiguration =>
doDelete(name) *>
queueConfiguration.deadletter.traverse_ { case DeadletterQueueConfiguration(dlname, _) =>
doDelete(dlname)
}
}
}

override def exists(name: String): F[Boolean] =
getQueueUrl(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest

import java.net.URI

class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) extends QueueClient[F] {
class SQSClient[F[_]] private (client: SqsAsyncClient, makeDLQName: String => String)(implicit F: Async[F])
extends QueueClient[F] {

private def getQueueUrl(name: String): F[String] =
F.fromCompletableFuture {
Expand All @@ -39,7 +40,7 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext
.adaptError(makeQueueException(_, name))

override def administration: QueueAdministration[F] =
new SQSAdministration(client, getQueueUrl(_))
new SQSAdministration(client, getQueueUrl(_), makeDLQName)

override def statistics(name: String): QueueStatistics[F] =
new SQSStatistics(name, client, getQueueUrl(name))
Expand All @@ -54,9 +55,26 @@ class SQSClient[F[_]] private (client: SqsAsyncClient)(implicit F: Async[F]) ext

object SQSClient {

private def defaultMakeDLQName(name: String): String =
s"$name-dlq"

/**
* Creates a new [[SQSClient]].
*
* @param region the region to use
* @param credentials the credentials to use
* @param makeDLQName how the dead letter queue name is derived from the queue name
* by default it suffixes the queue name with `-dlq`
* @param httpClient the existing HTTP client to use.
* '''Note:''' if provided, it is not closed when resource is released,
* otherwise the client manages its own client and will close it when
* released
* @param endpoint the service endpoint to use.
*/
def apply[F[_]](
region: Region,
credentials: AwsCredentialsProvider,
makeDLQName: String => String = defaultMakeDLQName,
endpoint: Option[URI] = None,
httpClient: Option[SdkAsyncHttpClient] = None
)(implicit F: Async[F]
Expand All @@ -74,6 +92,6 @@ object SQSClient {
builder.build()
}
}
.map(new SQSClient(_))
.map(new SQSClient(_, makeDLQName))

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,34 @@ import cats.syntax.functor._
import cats.syntax.monadError._
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions
import com.commercetools.queue.{QueueAdministration, QueueConfiguration}
import com.commercetools.queue.{DeadletterQueueConfiguration, QueueAdministration, QueueConfiguration, QueueCreationConfiguration}

import java.time.Duration
import scala.concurrent.duration._

class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(implicit F: Async[F])
extends QueueAdministration[F] {

override def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit] =
F.blocking(
override def create(name: String, configuration: QueueCreationConfiguration): F[Unit] =
F.blocking {
val options = new CreateQueueOptions()
.setDefaultMessageTimeToLive(Duration.ofMillis(configuration.messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(configuration.lockTTL.toMillis))
configuration.deadletter match {
case Some(configuration) =>
options
.setMaxDeliveryCount(configuration.maxAttempts)
.setDeadLetteringOnMessageExpiration(true)
case None =>
options
.setMaxDeliveryCount(Int.MaxValue)
.setDeadLetteringOnMessageExpiration(false)
}
client.createQueue(
name,
new CreateQueueOptions()
.setDefaultMessageTimeToLive(Duration.ofMillis(messageTTL.toMillis))
.setLockDuration(Duration.ofMillis(lockTTL.toMillis))))
.void
options
)
}.void
.adaptError(makeQueueException(_, name))

override def update(name: String, messageTTL: Option[FiniteDuration], lockTTL: Option[FiniteDuration]): F[Unit] =
Expand All @@ -45,15 +57,19 @@ class ServiceBusAdministration[F[_]](client: ServiceBusAdministrationClient)(imp
messageTTL.foreach(ttl => properties.setDefaultMessageTimeToLive(Duration.ofMillis(ttl.toMillis)))
lockTTL.foreach(ttl => properties.setLockDuration(Duration.ofMillis(ttl.toMillis)))
val _ = client.updateQueue(properties)
}
}.adaptError(makeQueueException(_, name))

override def configuration(name: String): F[QueueConfiguration] =
F.blocking {
val properties = client.getQueue(name)
val messageTTL = properties.getDefaultMessageTimeToLive().toMillis.millis
val lockTTL = properties.getLockDuration().toMillis.millis
QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL)
}
val deadletter =
Option.when(properties.isDeadLetteringOnMessageExpiration())(
DeadletterQueueConfiguration(properties.getForwardDeadLetteredMessagesTo(), properties.getMaxDeliveryCount()))

QueueConfiguration(messageTTL = messageTTL, lockTTL = lockTTL, deadletter = deadletter)
}.adaptError(makeQueueException(_, name))

override def delete(name: String): F[Unit] =
F.blocking(client.deleteQueue(name))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

/**
* @param name the name of the deadletter queue
* @param maxAttempts the maximum number of delivery attempts made before forwarding a message to the dead letter queue
*/
final case class DeadletterQueueConfiguration(name: String, maxAttempts: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 Commercetools GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.commercetools.queue

final case class DeadletterQueueCreationConfiguration(maxAttempts: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ import scala.concurrent.duration.FiniteDuration
*/
trait QueueAdministration[F[_]] {

/** Creates a queue with the given name, message TTL and lock TTL. */
def create(name: String, messageTTL: FiniteDuration, lockTTL: FiniteDuration): F[Unit]
/**
* Creates a queue with the given name and configuration.
* If the configuration contains a `deadletter` element, a dead letter
* queue is created and associated to the main queue with the configured
* maximum delivery attempt.
*/
def create(name: String, configuration: QueueCreationConfiguration): F[Unit]

/**
* Updates the queue with the given name, with provided message TTL and/or lock TTL.
Expand All @@ -35,7 +40,9 @@ trait QueueAdministration[F[_]] {
/** Returns the current configuration settings for the queue. */
def configuration(name: String): F[QueueConfiguration]

/** Deletes the queue with the given name. */
/** Deletes the queue with the given name.
* It also tries to delete the dead letter queue if any is configured and detected.
*/
def delete(name: String): F[Unit]

/** Indicates whether the queue with the given name exists. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,12 @@ package com.commercetools.queue

import scala.concurrent.duration.FiniteDuration

final case class QueueConfiguration(messageTTL: FiniteDuration, lockTTL: FiniteDuration)
/**
* @param messageTTL the time a message is guaranteed to stay in the queue before being discarded by the underlying system
* @param lockTTL the time a message is locked (or leased) upon reception by a subscriber before being eligible to redelivery
* @param deadletter the dead-letter queue configuration if any
*/
final case class QueueConfiguration(
messageTTL: FiniteDuration,
lockTTL: FiniteDuration,
deadletter: Option[DeadletterQueueConfiguration])
Loading
Loading