diff --git a/docs/utilities/validation.md b/docs/utilities/validation.md index 226e10bb6..28ed1161f 100644 --- a/docs/utilities/validation.md +++ b/docs/utilities/validation.md @@ -8,7 +8,7 @@ This utility provides JSON Schema validation for payloads held within events and **Key features** * Validate incoming events and responses -* Built-in validation for most common events (API Gateway, SNS, SQS, ...) +* Built-in validation for most common events (API Gateway, SNS, SQS, ...) and support for partial batch failures (SQS, Kinesis) * JMESPath support validate only a sub part of the event ## Install @@ -92,10 +92,15 @@ The validator is configured to enable format assertions by default even for 2019 `@Validation` annotation is used to validate either inbound events or functions' response. It will fail fast if an event or response doesn't conform with given JSON Schema. For most type of events a `ValidationException` will be thrown. + For API gateway events associated with REST APIs and HTTP APIs - `APIGatewayProxyRequestEvent` and `APIGatewayV2HTTPEvent` - the `@Validation` annotation will build and return a custom 400 / "Bad Request" response, with a body containing the validation errors. This saves you from having to catch the validation exception and map it back to a meaningful user error yourself. +For SQS and Kinesis events - `SQSEvent` and `KinesisEvent`- the `@Validation` annotation will add the invalid messages +to the batch item failures list in the response, respectively `SQSBatchResponse` and `StreamsEventResponse` +and removed from the event so that you do not process them within the handler. + While it is easier to specify a json schema file in the classpath (using the notation `"classpath:/path/to/schema.json"`), you can also provide a JSON String containing the schema. === "MyFunctionHandler.java" @@ -152,31 +157,31 @@ For the following events and responses, the Validator will automatically perform ** Events ** - Type of event | Class | Path to content | - ------------------------------------------------- | ------------------------------------------------- | ------------------------------------------------- - API Gateway REST | APIGatewayProxyRequestEvent | `body` - API Gateway HTTP | APIGatewayV2HTTPEvent | `body` - Application Load Balancer | ApplicationLoadBalancerRequestEvent | `body` - Cloudformation Custom Resource | CloudFormationCustomResourceEvent | `resourceProperties` - CloudWatch Logs | CloudWatchLogsEvent | `awslogs.powertools_base64_gzip(data)` - EventBridge / Cloudwatch | ScheduledEvent | `detail` - Kafka | KafkaEvent | `records[*][*].value` - Kinesis | KinesisEvent | `Records[*].kinesis.powertools_base64(data)` - Kinesis Firehose | KinesisFirehoseEvent | `Records[*].powertools_base64(data)` - Kinesis Analytics from Firehose | KinesisAnalyticsFirehoseInputPreprocessingEvent | `Records[*].powertools_base64(data)` - Kinesis Analytics from Streams | KinesisAnalyticsStreamsInputPreprocessingEvent | `Records[*].powertools_base64(data)` - SNS | SNSEvent | `Records[*].Sns.Message` - SQS | SQSEvent | `Records[*].body` +| Type of event | Class | Path to content | +|---------------------------------|-------------------------------------------------|----------------------------------------------| +| API Gateway REST | APIGatewayProxyRequestEvent | `body` | +| API Gateway HTTP | APIGatewayV2HTTPEvent | `body` | +| Application Load Balancer | ApplicationLoadBalancerRequestEvent | `body` | +| Cloudformation Custom Resource | CloudFormationCustomResourceEvent | `resourceProperties` | +| CloudWatch Logs | CloudWatchLogsEvent | `awslogs.powertools_base64_gzip(data)` | +| EventBridge / Cloudwatch | ScheduledEvent | `detail` | +| Kafka | KafkaEvent | `records[*][*].value` | +| Kinesis | KinesisEvent | `Records[*].kinesis.powertools_base64(data)` | +| Kinesis Firehose | KinesisFirehoseEvent | `Records[*].powertools_base64(data)` | +| Kinesis Analytics from Firehose | KinesisAnalyticsFirehoseInputPreprocessingEvent | `Records[*].powertools_base64(data)` | +| Kinesis Analytics from Streams | KinesisAnalyticsStreamsInputPreprocessingEvent | `Records[*].powertools_base64(data)` | +| SNS | SNSEvent | `Records[*].Sns.Message` | +| SQS | SQSEvent | `Records[*].body` | ** Responses ** - Type of response | Class | Path to content (envelope) - ------------------------------------------------- | ------------------------------------------------- | ------------------------------------------------- - API Gateway REST | APIGatewayProxyResponseEvent} | `body` - API Gateway HTTP | APIGatewayV2HTTPResponse} | `body` - API Gateway WebSocket | APIGatewayV2WebSocketResponse} | `body` - Load Balancer | ApplicationLoadBalancerResponseEvent} | `body` - Kinesis Analytics | KinesisAnalyticsInputPreprocessingResponse} | `Records[*].powertools_base64(data)`` +| Type of response | Class | Path to content (envelope) | +|-----------------------|---------------------------------------------|---------------------------------------| +| API Gateway REST | APIGatewayProxyResponseEvent} | `body` | +| API Gateway HTTP | APIGatewayV2HTTPResponse} | `body` | +| API Gateway WebSocket | APIGatewayV2WebSocketResponse} | `body` | +| Load Balancer | ApplicationLoadBalancerResponseEvent} | `body` | +| Kinesis Analytics | KinesisAnalyticsInputPreprocessingResponse} | `Records[*].powertools_base64(data)`` | ## Custom events and responses diff --git a/pom.xml b/pom.xml index b19520867..bec482d8e 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ 4.1.2 0.6.0 1.6.0 + 5.12.0 @@ -307,6 +308,12 @@ + + org.slf4j + slf4j-simple + ${slf4j.version} + test + org.skyscreamer jsonassert @@ -318,6 +325,12 @@ aspectjtools ${aspectj.version} + + org.mockito + mockito-core + ${mockito.version} + test + com.amazonaws aws-lambda-java-tests @@ -456,6 +469,33 @@ true + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.3.0 + + basedir=${project.rootdir} + checkstyle.xml + true + true + false + + + + + com.puppycrawl.tools + checkstyle + 10.12.3 + + + + + + check + + + + @@ -585,100 +625,6 @@ - - olderThanJdk11 - - (,11) - - - - 4.11.0 - - - - org.mockito - mockito-core - ${mockito.version} - test - - - org.mockito - mockito-inline - ${mockito.version} - test - - - - - newerThanJdk11 - - [11,) - - - 5.6.0 - - - - - org.mockito - mockito-core - ${mockito.version} - test - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - - true - - - - - - - - newerThanJdk8 - - [9,) - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - 3.3.0 - - basedir=${project.rootdir} - checkstyle.xml - true - true - false - - - - - com.puppycrawl.tools - checkstyle - 10.12.3 - - - - - - check - - - - - - - diff --git a/powertools-batch/pom.xml b/powertools-batch/pom.xml index 66a5e3087..819c19927 100644 --- a/powertools-batch/pom.xml +++ b/powertools-batch/pom.xml @@ -73,6 +73,16 @@ aws-lambda-java-tests test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + \ No newline at end of file diff --git a/powertools-cloudformation/pom.xml b/powertools-cloudformation/pom.xml index e3e4748d6..70650e51a 100644 --- a/powertools-cloudformation/pom.xml +++ b/powertools-cloudformation/pom.xml @@ -75,6 +75,16 @@ junit-jupiter-params test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.assertj assertj-core diff --git a/powertools-common/pom.xml b/powertools-common/pom.xml index 15409e1f6..da1c5b17d 100644 --- a/powertools-common/pom.xml +++ b/powertools-common/pom.xml @@ -71,6 +71,16 @@ assertj-core test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + diff --git a/powertools-idempotency/powertools-idempotency-core/pom.xml b/powertools-idempotency/powertools-idempotency-core/pom.xml index 302cc24f5..59b69da0f 100644 --- a/powertools-idempotency/powertools-idempotency-core/pom.xml +++ b/powertools-idempotency/powertools-idempotency-core/pom.xml @@ -38,5 +38,15 @@ powertools-serialization + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + \ No newline at end of file diff --git a/powertools-large-messages/pom.xml b/powertools-large-messages/pom.xml index 4206183de..18fddc6cb 100644 --- a/powertools-large-messages/pom.xml +++ b/powertools-large-messages/pom.xml @@ -97,6 +97,16 @@ junit-pioneer test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-logging/pom.xml b/powertools-logging/pom.xml index a1148a9cd..a6303c66b 100644 --- a/powertools-logging/pom.xml +++ b/powertools-logging/pom.xml @@ -67,6 +67,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-logging/powertools-logging-log4j/pom.xml b/powertools-logging/powertools-logging-log4j/pom.xml index d76137678..35cd1c189 100644 --- a/powertools-logging/powertools-logging-log4j/pom.xml +++ b/powertools-logging/powertools-logging-log4j/pom.xml @@ -51,6 +51,11 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + org.apache.commons commons-lang3 diff --git a/powertools-logging/powertools-logging-logback/pom.xml b/powertools-logging/powertools-logging-logback/pom.xml index 11fc85b72..179c7822a 100644 --- a/powertools-logging/powertools-logging-logback/pom.xml +++ b/powertools-logging/powertools-logging-logback/pom.xml @@ -48,6 +48,11 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + org.apache.commons commons-lang3 diff --git a/powertools-metrics/pom.xml b/powertools-metrics/pom.xml index 0daa49664..d32fe2021 100644 --- a/powertools-metrics/pom.xml +++ b/powertools-metrics/pom.xml @@ -77,6 +77,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-parameters/powertools-parameters-appconfig/pom.xml b/powertools-parameters/powertools-parameters-appconfig/pom.xml index 34b1238f6..4a08a05fd 100644 --- a/powertools-parameters/powertools-parameters-appconfig/pom.xml +++ b/powertools-parameters/powertools-parameters-appconfig/pom.xml @@ -52,6 +52,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-parameters/powertools-parameters-dynamodb/pom.xml b/powertools-parameters/powertools-parameters-dynamodb/pom.xml index 2ec6ad27c..5bdedea20 100644 --- a/powertools-parameters/powertools-parameters-dynamodb/pom.xml +++ b/powertools-parameters/powertools-parameters-dynamodb/pom.xml @@ -53,6 +53,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-parameters/powertools-parameters-secrets/pom.xml b/powertools-parameters/powertools-parameters-secrets/pom.xml index 3275d0ee0..8b2902d9b 100644 --- a/powertools-parameters/powertools-parameters-secrets/pom.xml +++ b/powertools-parameters/powertools-parameters-secrets/pom.xml @@ -53,6 +53,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-parameters/powertools-parameters-ssm/pom.xml b/powertools-parameters/powertools-parameters-ssm/pom.xml index 65332c9ef..9d9982c26 100644 --- a/powertools-parameters/powertools-parameters-ssm/pom.xml +++ b/powertools-parameters/powertools-parameters-ssm/pom.xml @@ -53,6 +53,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-parameters/powertools-parameters-tests/pom.xml b/powertools-parameters/powertools-parameters-tests/pom.xml index d8e9b2a02..6ab2e4155 100644 --- a/powertools-parameters/powertools-parameters-tests/pom.xml +++ b/powertools-parameters/powertools-parameters-tests/pom.xml @@ -48,6 +48,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.apache.commons commons-lang3 diff --git a/powertools-serialization/pom.xml b/powertools-serialization/pom.xml index 454b30d3e..e4e059626 100644 --- a/powertools-serialization/pom.xml +++ b/powertools-serialization/pom.xml @@ -56,6 +56,11 @@ junit-jupiter-api test + + org.slf4j + slf4j-simple + test + org.assertj assertj-core diff --git a/powertools-tracing/pom.xml b/powertools-tracing/pom.xml index 345f14194..e7869b5f5 100644 --- a/powertools-tracing/pom.xml +++ b/powertools-tracing/pom.xml @@ -78,6 +78,16 @@ junit-jupiter-engine test + + org.mockito + mockito-core + test + + + org.slf4j + slf4j-simple + test + org.junit-pioneer junit-pioneer diff --git a/powertools-validation/pom.xml b/powertools-validation/pom.xml index c1521f1c7..7a686bfc2 100644 --- a/powertools-validation/pom.xml +++ b/powertools-validation/pom.xml @@ -83,7 +83,21 @@ junit-jupiter-engine test - + + org.slf4j + slf4j-simple + test + + + org.mockito + mockito-core + test + + + com.amazonaws + aws-lambda-java-tests + test + org.apache.commons commons-lang3 diff --git a/powertools-validation/src/main/java/software/amazon/lambda/powertools/validation/internal/ValidationAspect.java b/powertools-validation/src/main/java/software/amazon/lambda/powertools/validation/internal/ValidationAspect.java index bcbba9e03..68900d334 100644 --- a/powertools-validation/src/main/java/software/amazon/lambda/powertools/validation/internal/ValidationAspect.java +++ b/powertools-validation/src/main/java/software/amazon/lambda/powertools/validation/internal/ValidationAspect.java @@ -40,11 +40,15 @@ import com.amazonaws.services.lambda.runtime.events.KinesisFirehoseEvent; import com.amazonaws.services.lambda.runtime.events.RabbitMQEvent; import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.ScheduledEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.networknt.schema.JsonSchema; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.ListIterator; import java.util.Map; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; @@ -57,7 +61,7 @@ import software.amazon.lambda.powertools.validation.ValidationException; /** - * Aspect for {@link Validation} annotation + * Aspect for {@link Validation} annotation. Internal to Powertools, use the annotation itself. */ @Aspect public class ValidationAspect { @@ -77,11 +81,12 @@ public Object around(ProceedingJoinPoint pjp, if (validation.schemaVersion() != V201909) { ValidationConfig.get().setSchemaVersion(validation.schemaVersion()); } - + // we need this result object to be null at this point as validation of API events, if // it fails, will catch the ValidationException and generate a 400 API response. This response // will be stored in the result object to prevent executing the lambda - Object result = null; + Object validationResult = null; + boolean failFast = false; if (placedOnRequestHandler(pjp)) { validationNeeded = true; @@ -94,16 +99,19 @@ public Object around(ProceedingJoinPoint pjp, validate(obj, inboundJsonSchema, validation.envelope()); } else if (obj instanceof APIGatewayProxyRequestEvent) { APIGatewayProxyRequestEvent event = (APIGatewayProxyRequestEvent) obj; - result = validateAPIGatewayProxyBody(event.getBody(), inboundJsonSchema, null, null); + validationResult = validateAPIGatewayProxyBody(event.getBody(), inboundJsonSchema, null, null); + failFast = true; } else if (obj instanceof APIGatewayV2HTTPEvent) { APIGatewayV2HTTPEvent event = (APIGatewayV2HTTPEvent) obj; - result = validateAPIGatewayV2HTTPBody(event.getBody(), inboundJsonSchema, null, null); + validationResult = validateAPIGatewayV2HTTPBody(event.getBody(), inboundJsonSchema, null, null); + failFast = true; } else if (obj instanceof SNSEvent) { SNSEvent event = (SNSEvent) obj; - event.getRecords().forEach(record -> validate(record.getSNS().getMessage(), inboundJsonSchema)); + event.getRecords() + .forEach(snsRecord -> validate(snsRecord.getSNS().getMessage(), inboundJsonSchema)); } else if (obj instanceof SQSEvent) { SQSEvent event = (SQSEvent) obj; - event.getRecords().forEach(record -> validate(record.getBody(), inboundJsonSchema)); + validationResult = validateSQSEventMessages(event.getRecords(), inboundJsonSchema); } else if (obj instanceof ScheduledEvent) { ScheduledEvent event = (ScheduledEvent) obj; validate(event.getDetail(), inboundJsonSchema); @@ -118,30 +126,32 @@ public Object around(ProceedingJoinPoint pjp, validate(event.getResourceProperties(), inboundJsonSchema); } else if (obj instanceof KinesisEvent) { KinesisEvent event = (KinesisEvent) obj; - event.getRecords() - .forEach(record -> validate(decode(record.getKinesis().getData()), inboundJsonSchema)); + validationResult = validateKinesisEventRecords(event.getRecords(), inboundJsonSchema); } else if (obj instanceof KinesisFirehoseEvent) { KinesisFirehoseEvent event = (KinesisFirehoseEvent) obj; - event.getRecords().forEach(record -> validate(decode(record.getData()), inboundJsonSchema)); + event.getRecords() + .forEach(eventRecord -> validate(decode(eventRecord.getData()), inboundJsonSchema)); } else if (obj instanceof KafkaEvent) { KafkaEvent event = (KafkaEvent) obj; event.getRecords().forEach((s, records) -> records.forEach( - record -> validate(decode(record.getValue()), inboundJsonSchema))); + eventRecord -> validate(decode(eventRecord.getValue()), inboundJsonSchema))); } else if (obj instanceof ActiveMQEvent) { ActiveMQEvent event = (ActiveMQEvent) obj; - event.getMessages().forEach(record -> validate(decode(record.getData()), inboundJsonSchema)); + event.getMessages().forEach(message -> validate(decode(message.getData()), inboundJsonSchema)); } else if (obj instanceof RabbitMQEvent) { RabbitMQEvent event = (RabbitMQEvent) obj; event.getRmqMessagesByQueue().forEach((s, records) -> records.forEach( - record -> validate(decode(record.getData()), inboundJsonSchema))); + message -> validate(decode(message.getData()), inboundJsonSchema))); } else if (obj instanceof KinesisAnalyticsFirehoseInputPreprocessingEvent) { KinesisAnalyticsFirehoseInputPreprocessingEvent event = (KinesisAnalyticsFirehoseInputPreprocessingEvent) obj; - event.getRecords().forEach(record -> validate(decode(record.getData()), inboundJsonSchema)); + event.getRecords() + .forEach(eventRecord -> validate(decode(eventRecord.getData()), inboundJsonSchema)); } else if (obj instanceof KinesisAnalyticsStreamsInputPreprocessingEvent) { KinesisAnalyticsStreamsInputPreprocessingEvent event = (KinesisAnalyticsStreamsInputPreprocessingEvent) obj; - event.getRecords().forEach(record -> validate(decode(record.getData()), inboundJsonSchema)); + event.getRecords() + .forEach(eventRecord -> validate(decode(eventRecord.getData()), inboundJsonSchema)); } else { LOG.warn("Unhandled event type {}, please use the 'envelope' parameter to specify what to validate", obj.getClass().getName()); @@ -149,100 +159,183 @@ record -> validate(decode(record.getData()), inboundJsonSchema))); } } - // don't execute the lambda if result was set by previous validation step + Object result; + + // don't execute the lambda if result was set by previous validation step and should fail fast // in that case result should already hold a response with validation information - if (result != null) { - LOG.error("Incoming API event's body failed inbound schema validation."); + if (failFast && validationResult != null) { + LOG.error("Incoming API event's body failed inbound schema validation."); + return validationResult; + } else { + result = pjp.proceed(proceedArgs); + + if (validationResult != null && result != null) { + // in the case of batches (SQS, Kinesis), we copy the batch item failures to the result + if (result instanceof SQSBatchResponse && validationResult instanceof SQSBatchResponse) { + SQSBatchResponse validationResponse = (SQSBatchResponse) validationResult; + SQSBatchResponse response = (SQSBatchResponse) result; + if (response.getBatchItemFailures() == null) { + response.setBatchItemFailures(validationResponse.getBatchItemFailures()); + } else { + response.getBatchItemFailures().addAll(validationResponse.getBatchItemFailures()); + } + } else if (result instanceof StreamsEventResponse && validationResult instanceof StreamsEventResponse) { + StreamsEventResponse validationResponse = (StreamsEventResponse) validationResult; + StreamsEventResponse response = (StreamsEventResponse) result; + if (response.getBatchItemFailures() == null) { + response.setBatchItemFailures(validationResponse.getBatchItemFailures()); + } else { + response.getBatchItemFailures().addAll(validationResponse.getBatchItemFailures()); + } + } + } + + if (result != null && validationNeeded && !validation.outboundSchema().isEmpty()) { + JsonSchema outboundJsonSchema = getJsonSchema(validation.outboundSchema(), true); + + Object overridenResponse = null; + // The normal behavior of @Validation is to throw an exception if response's validation fails. + // but in the case of APIGatewayProxyResponseEvent and APIGatewayV2HTTPResponse we want to return + // a 400 response with the validation errors instead of throwing an exception. + if (result instanceof APIGatewayProxyResponseEvent) { + APIGatewayProxyResponseEvent response = (APIGatewayProxyResponseEvent) result; + overridenResponse = + validateAPIGatewayProxyBody(response.getBody(), outboundJsonSchema, response.getHeaders(), + response.getMultiValueHeaders()); + } else if (result instanceof APIGatewayV2HTTPResponse) { + APIGatewayV2HTTPResponse response = (APIGatewayV2HTTPResponse) result; + overridenResponse = + validateAPIGatewayV2HTTPBody(response.getBody(), outboundJsonSchema, response.getHeaders(), + response.getMultiValueHeaders()); + // all type of below responses will throw an exception if validation fails + } else if (result instanceof APIGatewayV2WebSocketResponse) { + APIGatewayV2WebSocketResponse response = (APIGatewayV2WebSocketResponse) result; + validate(response.getBody(), outboundJsonSchema); + } else if (result instanceof ApplicationLoadBalancerResponseEvent) { + ApplicationLoadBalancerResponseEvent response = (ApplicationLoadBalancerResponseEvent) result; + validate(response.getBody(), outboundJsonSchema); + } else if (result instanceof KinesisAnalyticsInputPreprocessingResponse) { + KinesisAnalyticsInputPreprocessingResponse response = + (KinesisAnalyticsInputPreprocessingResponse) result; + response.getRecords().forEach(record -> validate(decode(record.getData()), outboundJsonSchema)); + } else { + LOG.warn( + "Unhandled response type {}, please use the 'envelope' parameter to specify what to validate", + result.getClass().getName()); + } + + if (overridenResponse != null) { + result = overridenResponse; + LOG.error("API response failed outbound schema validation."); + } + } } - else { - result = pjp.proceed(proceedArgs); - - if (validationNeeded && !validation.outboundSchema().isEmpty()) { - JsonSchema outboundJsonSchema = getJsonSchema(validation.outboundSchema(), true); - - Object overridenResponse = null; - // The normal behavior of @Validation is to throw an exception if response's validation fails. - // but in the case of APIGatewayProxyResponseEvent and APIGatewayV2HTTPResponse we want to return - // a 400 response with the validation errors instead of throwing an exception. - if (result instanceof APIGatewayProxyResponseEvent) { - APIGatewayProxyResponseEvent response = (APIGatewayProxyResponseEvent) result; - overridenResponse = validateAPIGatewayProxyBody(response.getBody(), outboundJsonSchema, response.getHeaders(), - response.getMultiValueHeaders()); - } else if (result instanceof APIGatewayV2HTTPResponse) { - APIGatewayV2HTTPResponse response = (APIGatewayV2HTTPResponse) result; - overridenResponse = validateAPIGatewayV2HTTPBody(response.getBody(), outboundJsonSchema, response.getHeaders(), - response.getMultiValueHeaders()); - // all type of below responses will throw an exception if validation fails - } else if (result instanceof APIGatewayV2WebSocketResponse) { - APIGatewayV2WebSocketResponse response = (APIGatewayV2WebSocketResponse) result; - validate(response.getBody(), outboundJsonSchema); - } else if (result instanceof ApplicationLoadBalancerResponseEvent) { - ApplicationLoadBalancerResponseEvent response = (ApplicationLoadBalancerResponseEvent) result; - validate(response.getBody(), outboundJsonSchema); - } else if (result instanceof KinesisAnalyticsInputPreprocessingResponse) { - KinesisAnalyticsInputPreprocessingResponse response = - (KinesisAnalyticsInputPreprocessingResponse) result; - response.getRecords().forEach(record -> validate(decode(record.getData()), outboundJsonSchema)); - } else { - LOG.warn("Unhandled response type {}, please use the 'envelope' parameter to specify what to validate", - result.getClass().getName()); - } - - if (overridenResponse != null) { - result = overridenResponse; - LOG.error("API response failed outbound schema validation."); - } - } - } return result; } - + + /** + * Validate each Kinesis record body. If an error occurs, do not fail the whole batch but only add invalid items in BatchItemFailure. + * Note that the valid records will be decoded twice (during validation and within the handler by the user), which will slightly reduce performance. + * @param records Kinesis records + * @param inboundJsonSchema validation schema + * @return the stream response with items in failure + */ + private StreamsEventResponse validateKinesisEventRecords(List records, + JsonSchema inboundJsonSchema) { + StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build(); + + ListIterator listIterator = records.listIterator(); // using iterator to remove while browsing + while (listIterator.hasNext()) { + KinesisEvent.KinesisEventRecord eventRecord = listIterator.next(); + try { + validate(decode(eventRecord.getKinesis().getData()), inboundJsonSchema); + } catch (ValidationException e) { + LOG.error("Validation error on message {}: {}", eventRecord.getKinesis().getSequenceNumber(), + e.getMessage()); + listIterator.remove(); + response.getBatchItemFailures().add(StreamsEventResponse.BatchItemFailure.builder() + .withItemIdentifier(eventRecord.getKinesis().getSequenceNumber()).build()); + } + } + return response; + } + + /** + * Validate each SQS message body. If an error occurs, do not fail the whole batch but only add invalid items in BatchItemFailure. + * + * @param messages SQS messages + * @param inboundJsonSchema validation schema + * @return the SQS batch response + */ + private SQSBatchResponse validateSQSEventMessages(List messages, + JsonSchema inboundJsonSchema) { + SQSBatchResponse response = SQSBatchResponse.builder().withBatchItemFailures(new ArrayList<>()).build(); + ListIterator listIterator = messages.listIterator(); // using iterator to remove while browsing + while (listIterator.hasNext()) { + SQSEvent.SQSMessage message = listIterator.next(); + try { + validate(message.getBody(), inboundJsonSchema); + } catch (ValidationException e) { + LOG.error("Validation error on message {}: {}", message.getMessageId(), e.getMessage()); + listIterator.remove(); + response.getBatchItemFailures() + .add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier(message.getMessageId()) + .build()); + } + } + return response; + } + /** * Validates the given body against the provided JsonSchema. If validation fails the ValidationException * will be catched and transformed to a 400, bad request, API response - * @param body body of the event to validate - * @param inboundJsonSchema validation schema + * + * @param body body of the event to validate + * @param jsonSchema validation schema * @return null if validation passed, or a 400 response object otherwise */ private APIGatewayProxyResponseEvent validateAPIGatewayProxyBody(final String body, final JsonSchema jsonSchema, - final Map headers, Map> multivalueHeaders) { - APIGatewayProxyResponseEvent result = null; - try { - validate(body, jsonSchema); - } catch (ValidationException e) { - LOG.error("There were validation errors: {}", e.getMessage()); - result = new APIGatewayProxyResponseEvent(); - result.setBody(e.getMessage()); - result.setHeaders(headers == null ? Collections.emptyMap() : headers); - result.setMultiValueHeaders(multivalueHeaders == null ? Collections.emptyMap() : multivalueHeaders); - result.setStatusCode(400); - result.setIsBase64Encoded(false); - } - return result; + final Map headers, + Map> multivalueHeaders) { + APIGatewayProxyResponseEvent result = null; + try { + validate(body, jsonSchema); + } catch (ValidationException e) { + LOG.error("There were validation errors: {}", e.getMessage()); + result = new APIGatewayProxyResponseEvent(); + result.setBody(e.getMessage()); + result.setHeaders(headers == null ? Collections.emptyMap() : headers); + result.setMultiValueHeaders(multivalueHeaders == null ? Collections.emptyMap() : multivalueHeaders); + result.setStatusCode(400); + result.setIsBase64Encoded(false); + } + return result; } - + /** * Validates the given body against the provided JsonSchema. If validation fails the ValidationException * will be catched and transformed to a 400, bad request, API response - * @param body body of the event to validate - * @param inboundJsonSchema validation schema + * + * @param body body of the event to validate + * @param jsonSchema validation schema * @return null if validation passed, or a 400 response object otherwise */ private APIGatewayV2HTTPResponse validateAPIGatewayV2HTTPBody(final String body, final JsonSchema jsonSchema, - final Map headers, Map> multivalueHeaders) { - APIGatewayV2HTTPResponse result = null; - try { - validate(body, jsonSchema); - } catch (ValidationException e) { - LOG.error("There were validation errors: {}", e.getMessage()); - result = new APIGatewayV2HTTPResponse(); - result.setBody(e.getMessage()); - result.setHeaders(headers == null ? Collections.emptyMap() : headers); - result.setMultiValueHeaders(multivalueHeaders == null ? Collections.emptyMap() : multivalueHeaders); - result.setStatusCode(400); - result.setIsBase64Encoded(false); - } - return result; + final Map headers, + Map> multivalueHeaders) { + APIGatewayV2HTTPResponse result = null; + try { + validate(body, jsonSchema); + } catch (ValidationException e) { + LOG.error("There were validation errors: {}", e.getMessage()); + result = new APIGatewayV2HTTPResponse(); + result.setBody(e.getMessage()); + result.setHeaders(headers == null ? Collections.emptyMap() : headers); + result.setMultiValueHeaders(multivalueHeaders == null ? Collections.emptyMap() : multivalueHeaders); + result.setStatusCode(400); + result.setIsBase64Encoded(false); + } + return result; } } diff --git a/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/KinesisHandlerWithError.java b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/KinesisHandlerWithError.java new file mode 100644 index 000000000..e6e702fb6 --- /dev/null +++ b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/KinesisHandlerWithError.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.validation.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import java.util.ArrayList; +import software.amazon.lambda.powertools.validation.Validation; + +public class KinesisHandlerWithError implements RequestHandler { + + @Override + @Validation(inboundSchema = "classpath:/schema_v7.json") + public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { + StreamsEventResponse response = StreamsEventResponse.builder().withBatchItemFailures(new ArrayList<>()).build(); + assert input.getRecords().size() == 2; // invalid messages have been removed from the input + response.getBatchItemFailures().add(StreamsEventResponse.BatchItemFailure.builder().withItemIdentifier("1234").build()); + return response; + } +} diff --git a/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/SQSHandlerWithError.java b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/SQSHandlerWithError.java new file mode 100644 index 000000000..23fceab5b --- /dev/null +++ b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/SQSHandlerWithError.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.validation.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import java.util.ArrayList; +import software.amazon.lambda.powertools.validation.Validation; + +public class SQSHandlerWithError implements RequestHandler { + + @Override + @Validation(inboundSchema = "classpath:/schema_v7.json") + public SQSBatchResponse handleRequest(SQSEvent input, Context context) { + SQSBatchResponse response = SQSBatchResponse.builder().withBatchItemFailures(new ArrayList<>()).build(); + assert input.getRecords().size() == 2; // invalid messages have been removed from the input + response.getBatchItemFailures().add(SQSBatchResponse.BatchItemFailure.builder().withItemIdentifier("1234").build()); + return response; + } +} diff --git a/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/StandardKinesisHandler.java b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/StandardKinesisHandler.java new file mode 100644 index 000000000..1afc5c5ec --- /dev/null +++ b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/StandardKinesisHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.validation.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import software.amazon.lambda.powertools.validation.Validation; + +public class StandardKinesisHandler implements RequestHandler { + + @Override + @Validation(inboundSchema = "classpath:/schema_v7.json") + public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { + StreamsEventResponse response = StreamsEventResponse.builder().build(); + assert input.getRecords().size() == 2; // invalid messages have been removed from the input + return response; + } +} diff --git a/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/StandardSQSHandler.java b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/StandardSQSHandler.java new file mode 100644 index 000000000..e0f0ece2d --- /dev/null +++ b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/handlers/StandardSQSHandler.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.validation.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.validation.Validation; + +public class StandardSQSHandler implements RequestHandler { + + @Override + @Validation(inboundSchema = "classpath:/schema_v7.json") + public SQSBatchResponse handleRequest(SQSEvent input, Context context) { + SQSBatchResponse response = SQSBatchResponse.builder().build(); + assert input.getRecords().size() == 2; // invalid messages have been removed from the input + return response; + } +} diff --git a/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/internal/ValidationAspectTest.java b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/internal/ValidationAspectTest.java index 1708ebeeb..42a18307e 100644 --- a/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/internal/ValidationAspectTest.java +++ b/powertools-validation/src/test/java/software/amazon/lambda/powertools/validation/internal/ValidationAspectTest.java @@ -20,24 +20,6 @@ import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.when; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Stream; - -import org.aspectj.lang.ProceedingJoinPoint; -import org.aspectj.lang.Signature; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.ArgumentsSource; -import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; @@ -55,24 +37,45 @@ import com.amazonaws.services.lambda.runtime.events.KinesisFirehoseEvent; import com.amazonaws.services.lambda.runtime.events.RabbitMQEvent; import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.ScheduledEvent; -import com.amazonaws.services.lambda.runtime.serialization.PojoSerializer; -import com.amazonaws.services.lambda.runtime.serialization.events.LambdaEventSerializers; +import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; +import com.amazonaws.services.lambda.runtime.tests.annotations.Event; import com.networknt.schema.SpecVersion; - +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.Signature; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import software.amazon.lambda.powertools.validation.Validation; import software.amazon.lambda.powertools.validation.ValidationConfig; import software.amazon.lambda.powertools.validation.ValidationException; import software.amazon.lambda.powertools.validation.handlers.GenericSchemaV7APIGatewayProxyRequestEventHandler; import software.amazon.lambda.powertools.validation.handlers.GenericSchemaV7StringHandler; +import software.amazon.lambda.powertools.validation.handlers.KinesisHandlerWithError; +import software.amazon.lambda.powertools.validation.handlers.SQSHandlerWithError; import software.amazon.lambda.powertools.validation.handlers.SQSWithCustomEnvelopeHandler; import software.amazon.lambda.powertools.validation.handlers.SQSWithWrongEnvelopeHandler; +import software.amazon.lambda.powertools.validation.handlers.StandardKinesisHandler; +import software.amazon.lambda.powertools.validation.handlers.StandardSQSHandler; import software.amazon.lambda.powertools.validation.handlers.ValidationInboundAPIGatewayV2HTTPEventHandler; import software.amazon.lambda.powertools.validation.model.MyCustomEvent; -public class ValidationAspectTest { +class ValidationAspectTest { @Mock Validation validation; @@ -167,7 +170,7 @@ void testValidateOutboundJsonSchemaWithHandledExceptions(Object object) throws T } @Test - public void testValidateOutboundJsonSchema_APIGWV2() throws Throwable { + void testValidateOutboundJsonSchema_APIGWV2() throws Throwable { when(validation.schemaVersion()).thenReturn(SpecVersion.VersionFlag.V7); when(pjp.getSignature()).thenReturn(signature); when(pjp.getSignature().getDeclaringType()).thenReturn(RequestHandler.class); @@ -187,7 +190,7 @@ public void testValidateOutboundJsonSchema_APIGWV2() throws Throwable { } @Test - public void validate_inputOK_schemaInClasspath_shouldValidate() { + void validate_inputOK_schemaInClasspath_shouldValidate() { GenericSchemaV7APIGatewayProxyRequestEventHandler handler = new GenericSchemaV7APIGatewayProxyRequestEventHandler(); APIGatewayProxyRequestEvent event = new APIGatewayProxyRequestEvent(); event.setBody("{" + @@ -204,7 +207,7 @@ public void validate_inputOK_schemaInClasspath_shouldValidate() { } @Test - public void validate_inputKO_schemaInClasspath_shouldThrowValidationException() { + void validate_inputKO_schemaInClasspath_shouldThrowValidationException() { GenericSchemaV7APIGatewayProxyRequestEventHandler handler = new GenericSchemaV7APIGatewayProxyRequestEventHandler(); Map headers = new HashMap<>(); @@ -232,7 +235,7 @@ public void validate_inputKO_schemaInClasspath_shouldThrowValidationException() } @Test - public void validate_inputOK_schemaInString_shouldValidate() { + void validate_inputOK_schemaInString_shouldValidate() { ValidationInboundAPIGatewayV2HTTPEventHandler handler = new ValidationInboundAPIGatewayV2HTTPEventHandler(); APIGatewayV2HTTPEvent event = new APIGatewayV2HTTPEvent(); event.setBody("{" + @@ -248,7 +251,7 @@ public void validate_inputOK_schemaInString_shouldValidate() { @Test - public void validate_inputKO_schemaInString_shouldThrowValidationException() { + void validate_inputKO_schemaInString_shouldThrowValidationException() { ValidationInboundAPIGatewayV2HTTPEventHandler handler = new ValidationInboundAPIGatewayV2HTTPEventHandler(); Map headers = new HashMap<>(); @@ -268,49 +271,77 @@ public void validate_inputKO_schemaInString_shouldThrowValidationException() { assertThat(response.getMultiValueHeaders()).isEmpty(); } - @Test - public void validate_SQS() { - PojoSerializer pojoSerializer = - LambdaEventSerializers.serializerFor(SQSEvent.class, ClassLoader.getSystemClassLoader()); - SQSEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/sqs.json")); - + @ParameterizedTest + @Event(value = "sqs.json", type = SQSEvent.class) + void validate_SQS(SQSEvent event) { GenericSchemaV7StringHandler handler = new GenericSchemaV7StringHandler<>(); assertThat(handler.handleRequest(event, context)).isEqualTo("OK"); } - @Test - public void validate_SQS_CustomEnvelopeTakePrecedence() { - PojoSerializer pojoSerializer = - LambdaEventSerializers.serializerFor(SQSEvent.class, ClassLoader.getSystemClassLoader()); - SQSEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/sqs_message.json")); + @ParameterizedTest + @Event(value = "sqs_invalid_messages.json", type = SQSEvent.class) + void validate_SQS_with_validation_partial_failure(SQSEvent event) { + StandardSQSHandler handler = new StandardSQSHandler(); + SQSBatchResponse response = handler.handleRequest(event, context); + assertThat(response.getBatchItemFailures()).hasSize(2); + assertThat(response.getBatchItemFailures().stream().map(SQSBatchResponse.BatchItemFailure::getItemIdentifier).collect( + Collectors.toList())).contains("d9144555-9a4f-4ec3-99a0-fc4e625a8db3", "d9144555-9a4f-4ec3-99a0-fc4e625a8db5"); + } + + @ParameterizedTest + @Event(value = "sqs_invalid_messages.json", type = SQSEvent.class) + void validate_SQS_with_partial_failure(SQSEvent event) { + SQSHandlerWithError handler = new SQSHandlerWithError(); + SQSBatchResponse response = handler.handleRequest(event, context); + assertThat(response.getBatchItemFailures()).hasSize(3); + assertThat(response.getBatchItemFailures().stream().map(SQSBatchResponse.BatchItemFailure::getItemIdentifier).collect( + Collectors.toList())).contains("d9144555-9a4f-4ec3-99a0-fc4e625a8db3", "d9144555-9a4f-4ec3-99a0-fc4e625a8db5", "1234"); + } + @ParameterizedTest + @Event(value = "sqs_message.json", type = SQSEvent.class) + void validate_SQS_CustomEnvelopeTakePrecedence(SQSEvent event) { SQSWithCustomEnvelopeHandler handler = new SQSWithCustomEnvelopeHandler(); assertThat(handler.handleRequest(event, context)).isEqualTo("OK"); } - @Test - public void validate_SQS_WrongEnvelope_shouldThrowValidationException() { - PojoSerializer pojoSerializer = - LambdaEventSerializers.serializerFor(SQSEvent.class, ClassLoader.getSystemClassLoader()); - SQSEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/sqs_message.json")); - + @ParameterizedTest + @Event(value = "sqs_message.json", type = SQSEvent.class) + void validate_SQS_WrongEnvelope_shouldThrowValidationException(SQSEvent event) { SQSWithWrongEnvelopeHandler handler = new SQSWithWrongEnvelopeHandler(); assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> handler.handleRequest(event, context)); } - @Test - public void validate_Kinesis() { - PojoSerializer pojoSerializer = - LambdaEventSerializers.serializerFor(KinesisEvent.class, ClassLoader.getSystemClassLoader()); - KinesisEvent event = pojoSerializer.fromJson(this.getClass().getResourceAsStream("/kinesis.json")); - + @ParameterizedTest + @Event(value = "kinesis.json", type = KinesisEvent.class) + void validate_Kinesis(KinesisEvent event) { GenericSchemaV7StringHandler handler = new GenericSchemaV7StringHandler<>(); assertThat(handler.handleRequest(event, context)).isEqualTo("OK"); } + @ParameterizedTest + @Event(value = "kinesis_invalid_messages.json", type = KinesisEvent.class) + void validate_Kinesis_with_validation_partial_failure(KinesisEvent event) { + StandardKinesisHandler handler = new StandardKinesisHandler(); + StreamsEventResponse response = handler.handleRequest(event, context); + assertThat(response.getBatchItemFailures()).hasSize(2); + assertThat(response.getBatchItemFailures().stream().map(StreamsEventResponse.BatchItemFailure::getItemIdentifier).collect( + Collectors.toList())).contains("49545115243490985018280067714973144582180062593244200962", "49545115243490985018280067714973144582180062593244200964"); + } + + @ParameterizedTest + @Event(value = "kinesis_invalid_messages.json", type = KinesisEvent.class) + void validate_Kinesis_with_partial_failure(KinesisEvent event) { + KinesisHandlerWithError handler = new KinesisHandlerWithError(); + StreamsEventResponse response = handler.handleRequest(event, context); + assertThat(response.getBatchItemFailures()).hasSize(3); + assertThat(response.getBatchItemFailures().stream().map(StreamsEventResponse.BatchItemFailure::getItemIdentifier).collect( + Collectors.toList())).contains("49545115243490985018280067714973144582180062593244200962", "49545115243490985018280067714973144582180062593244200964", "1234"); + } + @ParameterizedTest @MethodSource("provideEventAndEventType") - public void validateEEvent(String jsonResource, Class eventClass) throws IOException { + void validateEEvent(String jsonResource, Class eventClass) throws IOException { Object event = ValidationConfig.get().getObjectMapper() .readValue(this.getClass().getResourceAsStream(jsonResource), eventClass); diff --git a/powertools-validation/src/test/resources/kinesis_invalid_messages.json b/powertools-validation/src/test/resources/kinesis_invalid_messages.json new file mode 100644 index 000000000..3d805c4dd --- /dev/null +++ b/powertools-validation/src/test/resources/kinesis_invalid_messages.json @@ -0,0 +1,72 @@ +{ + "Records": [ + { + "kinesis": { + "partitionKey": "partitionKey-03", + "kinesisSchemaVersion": "1.0", + "data": "ewogICJpZCI6IDQzMjQyLAogICJuYW1lIjogIkZvb0JhciBYWSIsCiAgInByaWNlIjogMjU4Cn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200961", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-04", + "kinesisSchemaVersion": "1.0", + "data": "ewogICJpZCI6InN0cmluZ0lkIiwKICAibmFtZSI6ICJGb29CYXIgWFkiLAogICJwcmljZSI6IDI1OAp9", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200962", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-04", + "kinesisSchemaVersion": "1.0", + "data": "ewogICJpZCI6IDQyNSwKICAibmFtZSI6ICJCYXJGb28iLAogICJwcmljZSI6IDQzCn0=", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200963", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + }, + { + "kinesis": { + "partitionKey": "partitionKey-04", + "kinesisSchemaVersion": "1.0", + "data": "ewogICJpZCI6MTIzNCwKICAibmFtZSI6ICJGb28iLAogICJwcmljZSI6IDI1OAp9", + "sequenceNumber": "49545115243490985018280067714973144582180062593244200964", + "approximateArrivalTimestamp": 1428537600, + "encryptionType": "NONE" + }, + "eventSource": "aws:kinesis", + "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961", + "invokeIdentityArn": "arn:aws:iam::EXAMPLE", + "eventVersion": "1.0", + "eventName": "aws:kinesis:record", + "eventSourceARN": "arn:aws:kinesis:EXAMPLE", + "awsRegion": "eu-central-1" + } + ] +} diff --git a/powertools-validation/src/test/resources/sqs_invalid_messages.json b/powertools-validation/src/test/resources/sqs_invalid_messages.json new file mode 100644 index 000000000..aaec54bfd --- /dev/null +++ b/powertools-validation/src/test/resources/sqs_invalid_messages.json @@ -0,0 +1,72 @@ +{ + "Records": [ + { + "messageId": "d9144555-9a4f-4ec3-99a0-fc4e625a8db2", + "receiptHandle": "7kam5bfzbDsjtcjElvhSbxeLJbeey3A==", + "body": "{\n \"id\": 43242,\n \"name\": \"FooBar XY\",\n \"price\": 258\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975709495", + "SenderId": "AROAIFU457DVZ5L2J53F2", + "ApproximateFirstReceiveTimestamp": "1601975709499" + }, + "messageAttributes": { + }, + "md5OfBody": "0f96e88a291edb4429f2f7b9fdc3df96", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-fc4e625a8db3", + "receiptHandle": "7kam5bfzbDsjtcjElvhSbxeLJbeey3A==", + "body": "{\n \"id\": 43245,\n \"name\": \"Foo\",\n \"price\": 258\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975709495", + "SenderId": "AROAIFU457DVZ5L2J53F2", + "ApproximateFirstReceiveTimestamp": "1601975709499" + }, + "messageAttributes": { + }, + "md5OfBody": "0f96e88a291edb4429f2f7b9fdc3df96", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-fc4e625a8db4", + "receiptHandle": "7kam5bfzbDsjtcjElvhSbxeLJbeey3A==", + "body": "{\n \"id\": 43246,\n \"name\": \"FooBar XYZ\",\n \"price\": 258\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975709495", + "SenderId": "AROAIFU457DVZ5L2J53F2", + "ApproximateFirstReceiveTimestamp": "1601975709499" + }, + "messageAttributes": { + }, + "md5OfBody": "0f96e88a291edb4429f2f7b9fdc3df96", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + }, + { + "messageId": "d9144555-9a4f-4ec3-99a0-fc4e625a8db5", + "receiptHandle": "7kam5bfzbDsjtcjElvhSbxeLJbeey3A==", + "body": "{\n \"id\": \"stringId\",\n \"name\": \"FooBar XY\",\n \"price\": 258\n}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1601975709495", + "SenderId": "AROAIFU457DVZ5L2J53F2", + "ApproximateFirstReceiveTimestamp": "1601975709499" + }, + "messageAttributes": { + }, + "md5OfBody": "0f96e88a291edb4429f2f7b9fdc3df96", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-central-1:123456789012:TestLambda", + "awsRegion": "eu-central-1" + } + ] +} \ No newline at end of file