From fd12d48c45b18e642a8002663e72f37ee83528c6 Mon Sep 17 00:00:00 2001 From: Andreas Gebhardt Date: Fri, 8 Apr 2022 20:49:41 +0200 Subject: [PATCH] [WIP] large SQS message --- aws/sqs/java/aws-sqs/pom.xml | 22 +++++++++++ .../main/java/com/github/agebhar1/App.java | 37 +++++++++++++++++-- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/aws/sqs/java/aws-sqs/pom.xml b/aws/sqs/java/aws-sqs/pom.xml index dafe296..9b01789 100644 --- a/aws/sqs/java/aws-sqs/pom.xml +++ b/aws/sqs/java/aws-sqs/pom.xml @@ -37,6 +37,16 @@ + + com.amazonaws + amazon-sqs-java-extended-client-lib + 2.0.2 + + + com.fasterxml.jackson.core + jackson-databind + 2.13.2.2 + software.amazon.awssdk bom @@ -63,6 +73,14 @@ + + com.amazonaws + amazon-sqs-java-extended-client-lib + + + com.fasterxml.jackson.core + jackson-databind + org.slf4j jcl-over-slf4j @@ -71,6 +89,10 @@ org.slf4j slf4j-simple + + software.amazon.awssdk + s3 + software.amazon.awssdk sqs diff --git a/aws/sqs/java/aws-sqs/src/main/java/com/github/agebhar1/App.java b/aws/sqs/java/aws-sqs/src/main/java/com/github/agebhar1/App.java index 5bad975..0357be6 100644 --- a/aws/sqs/java/aws-sqs/src/main/java/com/github/agebhar1/App.java +++ b/aws/sqs/java/aws-sqs/src/main/java/com/github/agebhar1/App.java @@ -1,9 +1,12 @@ package com.github.agebhar1; +import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; +import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sqs.SqsAsyncClient; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.SqsException; @@ -13,6 +16,8 @@ import java.time.Instant; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class App { @@ -21,14 +26,16 @@ public class App { private final static String Queue = ""; private final static Logger logger = LoggerFactory.getLogger(App.class); + private final static String Bucket = ""; + public static void main(String[] args) throws URISyntaxException { - async(args); + largeMsg(args); } public static void async(String[] args) throws URISyntaxException { final var asyncSqsClient = SqsAsyncClient.builder() - .credentialsProvider(ProfileCredentialsProvider.create("")) + .credentialsProvider(ProfileCredentialsProvider.create()) .region(Region.EU_CENTRAL_1) .build(); @@ -41,16 +48,38 @@ public static void async(String[] args) throws URISyntaxException { public static void sync(String[] args) throws URISyntaxException { final var sqsClient = SqsClient.builder() - .credentialsProvider(ProfileCredentialsProvider.create("")) + .credentialsProvider(ProfileCredentialsProvider.create()) .region(Region.EU_CENTRAL_1) .build(); var message = "Hello SQS! " + Instant.now(); - sendMessage(sqsClient, new URI(Queue), message); + sendMessage(sqsClient, new URI(Queue), message + "B"); receiveMessages(sqsClient, new URI(Queue)); } + public static void largeMsg(String[] args) throws URISyntaxException { + + var s3 = S3Client.builder() + .credentialsProvider(ProfileCredentialsProvider.create()) + .region(Region.EU_CENTRAL_1) + .build(); + + final var sqsClient = SqsClient.builder() + .credentialsProvider(ProfileCredentialsProvider.create()) + .region(Region.EU_CENTRAL_1) + .build(); + + var cfg = new ExtendedClientConfiguration() + .withPayloadSupportEnabled(s3, Bucket); + var xtd = new AmazonSQSExtendedClient(sqsClient, cfg); + + var message = Stream.generate(() -> "A").limit(256 * 1024).collect(Collectors.joining()); + + sendMessage(xtd, new URI(Queue), message + "B"); + receiveMessages(xtd, new URI(Queue)); + } + private static void sendMessage(final SqsClient sqsClient, final URI queue, final String message) { logger.info("Try to send message async to SQS: {}.", queue); try {