Skip to content

Commit

Permalink
adds support for 'consumer' or 'function' as implementation for Sprin…
Browse files Browse the repository at this point in the history
…g Cloud Streams listener implementation.
  • Loading branch information
ivangsa committed May 25, 2024
1 parent c97d317 commit 45747f8
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 40 deletions.
61 changes: 31 additions & 30 deletions plugins/asyncapi-spring-cloud-streams3/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public enum TransactionalOutboxType {
none, mongodb, jdbc
}

public enum FunctionalInterfaceImplementation {
consumer, function
}

public String sourceProperty = "api";

@DocumentedOption(description = "Programming style")
Expand Down Expand Up @@ -68,6 +72,9 @@ public enum TransactionalOutboxType {
@DocumentedOption(description = "Spring-Boot binding suffix. It will be appended to the operation name kebab-cased. E.g. <operation-id>-in-0")
public String bindingSuffix = "-0";

@DocumentedOption(description = "Whether to use a 'java.util.function.Consumer' or 'java.util.function.Function' as the functional interface implementation.")
public FunctionalInterfaceImplementation functionalInterfaceImplementation = FunctionalInterfaceImplementation.consumer;

@DocumentedOption(description = "AsyncAPI extension property name for runtime auto-configuration of headers.")
public String runtimeHeadersProperty = "x-runtime-expression";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
Expand All @@ -24,7 +25,13 @@ import {{modelPackage}}.*;

@Component("{{bindingPrefix}}{{operation.x--operationIdKebabCase}}")
@jakarta.annotation.Generated(value = "io.zenwave360.sdk.plugins.SpringCloudStreams3Plugin", date = "{{date}}")
public class {{consumerName operation.x--operationIdCamelCase}} implements Function<Message<{{messageType operation}}>, Void> {
public class {{consumerName operation.x--operationIdCamelCase}} implements
{{~#if (eq functionalInterfaceImplementation 'function')~}}
Function<Message<{{messageType operation}}>, Void>
{{~else~}}
Consumer<Message<{{messageType operation}}>>
{{~/if~}}
{

protected Logger log = LoggerFactory.getLogger(getClass());

Expand All @@ -51,7 +58,13 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Funct
}
{{~/if}}

@Override
// NOTE: implementing both accept and apply facilitates code generation.
// See what this class actually implements.

public void accept(Message<{{messageType operation}}> message) {
apply(message);
}

public Void apply(Message<{{messageType operation}}> message) {
log.debug("Received message: {}", message);
try {
Expand Down Expand Up @@ -86,12 +99,14 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Funct
if (streamBridge != null && resolvedDLQ != null) {
try {
log.debug("Sending message to dead letter queue: {}", resolvedDLQ);
Object payload = message.getPayload();
var headers = new HashMap(message.getHeaders());
headers.put("x-original-headers", message.getHeaders().toString());
headers.put("x-exception-type", e.getClass().getName());
headers.put("x-exception-message", e.getMessage());
headers.put("x-exception-stacktrace ", getStackTraceAsString(e));
headers.put("x-exception-payload-type", message.getPayload().getClass().getName());
streamBridge.send(resolvedDLQ, MessageBuilder.createMessage(message.getPayload(), new MessageHeaders(headers)));
headers.put("x-exception-payload-type", payload.getClass().getName());
streamBridge.send(resolvedDLQ, MessageBuilder.createMessage(payload, new MessageHeaders(headers)));
return null;
} catch (Exception e1) {
log.error("Error sending message to dead letter queue: {}", resolvedDLQ, e1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ public interface {{serviceInterfaceName operation.x--operationIdCamelCase}} {
* Default method for handling unknown messages or tombstone records (null record values).
*/
{{~#if exposeMessage}}
default void defaultHandler(Message msg) {};
default void defaultHandler(Message msg) {
var payload = msg.getPayload();
throw new UnsupportedOperationException("Payload type not supported: " + (payload != null? payload.getClass().getName() : null));
};
{{~else}}
default void defaultHandler(Object payload, Map<String, Object> headers) {};
default void defaultHandler(Object payload, Map<String, Object> headers) {
throw new UnsupportedOperationException("Payload type not supported: " + (payload != null? payload.getClass().getName() : null));
};
{{~/if}}

{{#each messages as |message|}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public boolean isProducer() {
@DocumentedOption(description = "Operation ids to include in code generation. Generates code for ALL if left empty")
public List<String> operationIds = new ArrayList<>();

@DocumentedOption(description = "Operation ids to include in code generation. Generates code for ALL if left empty")
public List<String> excludeOperationIds = new ArrayList<>();

public Map<String, List<Map<String, Object>>> getPublishOperationsGroupedByTag(Model apiModel) {
return getOperationsGroupedByTag(apiModel, AsyncapiOperationType.publish);
}
Expand Down Expand Up @@ -155,10 +158,15 @@ public boolean isProducer(Map<String, Object> operation) {
}

public boolean isSkipOperation(Map<String, Object> operation) {
if(operationIds == null || operationIds.isEmpty()) {
return false;
boolean isIncluded = true;
if(operationIds != null && !operationIds.isEmpty()) {
isIncluded = operationIds.contains((String) operation.get("operationId"));
}
boolean isExcluded = false;
if(excludeOperationIds != null && !excludeOperationIds.isEmpty()) {
isExcluded = excludeOperationIds.contains((String) operation.get("operationId"));
}
return !operationIds.contains((String) operation.get("operationId"));
return !isIncluded || isExcluded;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public List<TemplateOutput> generate(Map<String, Object> contextModel) {
}

for (TemplateInput template : templates.allServicesTemplates) {
templateOutputList.addAll(generateTemplateOutput(contextModel, template, Map.of("services", servicesList, "entities", entities.values())));
templateOutputList.addAll(generateTemplateOutput(contextModel, template, Map.of("services", servicesList, "entities", new ArrayList(entities.values()))));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,31 @@ public void test_filter_operations_for_provider_nobindings() throws Exception {
Assertions.assertTrue(producerOperations.isEmpty());
}

@Test
public void test_filter_operations_for_provider_nobindings_includes() throws Exception {
Model model = loadAsyncapiModelFromResource("classpath:io/zenwave360/sdk/resources/asyncapi/v2/asyncapi-circular-refs.yml");
AbstractAsyncapiGenerator asyncapiGenerator = newAbstractAsyncapiGenerator();
asyncapiGenerator.role = AsyncapiRoleType.provider;
asyncapiGenerator.operationIds = Arrays.asList("doCreateProduct");
Map<String, List<Map<String, Object>>> consumerOperations = asyncapiGenerator.getSubscribeOperationsGroupedByTag(model);
Map<String, List<Map<String, Object>>> producerOperations = asyncapiGenerator.getPublishOperationsGroupedByTag(model);
Assertions.assertEquals(1, consumerOperations.size());
Assertions.assertEquals("doCreateProduct", consumerOperations.get("DefaultService").get(0).get("operationId"));
Assertions.assertTrue(producerOperations.isEmpty());
}

@Test
public void test_filter_operations_for_provider_nobindings_excludes() throws Exception {
Model model = loadAsyncapiModelFromResource("classpath:io/zenwave360/sdk/resources/asyncapi/v2/asyncapi-circular-refs.yml");
AbstractAsyncapiGenerator asyncapiGenerator = newAbstractAsyncapiGenerator();
asyncapiGenerator.role = AsyncapiRoleType.provider;
asyncapiGenerator.excludeOperationIds = Arrays.asList("doCreateProduct");
Map<String, List<Map<String, Object>>> consumerOperations = asyncapiGenerator.getSubscribeOperationsGroupedByTag(model);
Map<String, List<Map<String, Object>>> producerOperations = asyncapiGenerator.getPublishOperationsGroupedByTag(model);
Assertions.assertEquals(0, consumerOperations.size());
}


@Test
public void test_filter_operations_for_provider_nobindings_v3() throws Exception {
Model model = loadAsyncapiModelFromResource("classpath:io/zenwave360/sdk/resources/asyncapi/v3/customer-order-asyncapi.yml");
Expand Down

0 comments on commit 45747f8

Please sign in to comment.