Skip to content

Commit

Permalink
feat: SpringCloudStreams3Generator adds EnterpriseEnvelope
Browse files Browse the repository at this point in the history
  • Loading branch information
ivangsa committed Dec 7, 2022
1 parent 282a102 commit 95b8dbb
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
import io.zenwave360.generator.templating.TemplateInput;
import io.zenwave360.generator.templating.TemplateOutput;
import io.zenwave360.generator.utils.JSONPath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpringCloudStreams3Generator extends AbstractAsyncapiGenerator {

private Logger log = LoggerFactory.getLogger(getClass());

public enum TransactionalOutboxType {
none, mongodb, jdbc
}
Expand All @@ -27,9 +31,15 @@ public enum TransactionalOutboxType {
@DocumentedOption(description = "Transactional outbox type for message producers.")
public TransactionalOutboxType transactionalOutbox = TransactionalOutboxType.none;

@DocumentedOption(description = "Whether to expose underlying spring Message to consumers or not. Default: false")
@DocumentedOption(description = "Whether to expose underlying spring Message to consumers or not.")
public boolean exposeMessage = false;

@DocumentedOption(description = "Include support for enterprise envelop wrapping/unwrapping.")
public boolean useEnterpriseEnvelope = false;

@DocumentedOption(description = "AsyncAPI Message extension name for the envelop java type for wrapping/unwrapping.")
public String envelopeJavaTypeExtensionName = "x-envelope-java-type";

@DocumentedOption(description = "To avoid method erasure conflicts, when exposeMessage or reactive style this character will be used as separator to append message payload type to method names in consumer interfaces.")
public String methodAndMessageSeparator = "$";

Expand Down Expand Up @@ -60,8 +70,24 @@ public SpringCloudStreams3Generator withSourceProperty(String sourceProperty) {
});
handlebarsEngine.getHandlebars().registerHelper("messageType", (operation, options) -> {
List<String> messageTypes = JSONPath.get(operation, "$.x--messages[*].x--javaType");
List<String> envelopTypes = JSONPath.get(operation, "$.x--messages[*]." + envelopeJavaTypeExtensionName);
String operationEnvelop = JSONPath.get(operation, "$." + envelopeJavaTypeExtensionName);
if(operationEnvelop != null) {
envelopTypes.add(operationEnvelop);
}
if(useEnterpriseEnvelope && !envelopTypes.isEmpty()) {
return envelopTypes.size() == 1 ? envelopTypes.get(0) : "Object";
}
return messageTypes.size() == 1 ? messageTypes.get(0) : "Object";
});
handlebarsEngine.getHandlebars().registerHelper("hasEnterpriseEnvelope", (operation, options) -> {
List<String> envelopTypes = JSONPath.get(operation, "$.x--messages[*]." + envelopeJavaTypeExtensionName);
String operationEnvelop = JSONPath.get(operation, "$." + envelopeJavaTypeExtensionName);
if(operationEnvelop != null) {
envelopTypes.add(operationEnvelop);
}
return useEnterpriseEnvelope && !envelopTypes.isEmpty();
});
handlebarsEngine.getHandlebars().registerHelper("serviceName", (context, options) -> {
return String.format("%s%s%s", servicePrefix, context, serviceSuffix);
});
Expand Down Expand Up @@ -120,12 +146,10 @@ public List<TemplateOutput> generate(Map<String, Object> contextModel) {
Map<String, List<Map<String, Object>>> subscribeOperations = getSubscribeOperationsGroupedByTag(apiModel);
Map<String, List<Map<String, Object>>> publishOperations = getPublishOperationsGroupedByTag(apiModel);
for (Map.Entry<String, List<Map<String, Object>>> entry : subscribeOperations.entrySet()) {
// boolean isProducer = isProducer(role, OperationType.SUBSCRIBE);
OperationRoleType operationRoleType = OperationRoleType.valueOf(role, AsyncapiOperationType.subscribe);
templateOutputList.addAll(generateTemplateOutput(contextModel, entry.getKey(), entry.getValue(), operationRoleType));
}
for (Map.Entry<String, List<Map<String, Object>>> entry : publishOperations.entrySet()) {
// boolean isProducer = isProducer(role, OperationType.PUBLISH);
OperationRoleType operationRoleType = OperationRoleType.valueOf(role, AsyncapiOperationType.publish);
templateOutputList.addAll(generateTemplateOutput(contextModel, entry.getKey(), entry.getValue(), operationRoleType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Consu

protected {{serviceName operation.x--operationIdCamelCase}} service;
protected StreamBridge streamBridge;

protected Map<Class<? extends Exception>, String> errorQueueMap;
{{~#if useEnterpriseEnvelope}}
public EnvelopeUnWrapper envelopeUnWrapper;
{{~/if}}

public {{consumerName operation.x--operationIdCamelCase}}({{serviceName operation.x--operationIdCamelCase}} service, @Autowired(required=false) StreamBridge streamBridge) {
this.service = service;
Expand All @@ -39,13 +41,18 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Consu
public void setErrorQueueMap(Map<Class<? extends Exception>, String> errorQueueMap) {
this.errorQueueMap = errorQueueMap;
}

{{~#if useEnterpriseEnvelope}}
@Autowired(required = false)
public void setEnvelopeUnWrapper(EnvelopeUnWrapper envelopeUnWrapper) {
this.envelopeUnWrapper = envelopeUnWrapper;
}
{{~/if}}

@Override
public void accept(Message<{{messageType operation}}> message) {
log.debug("Received message: {}", message);
try {
Object payload = message.getPayload();
Object payload = {{#if (hasEnterpriseEnvelope operation)}}unwrap(message.getPayload()){{else}}message.getPayload(){{/if}};
{{~#each operation.x--messages as |message|}}
if(payload instanceof {{message.x--javaType}}) {
{{~#if exposeMessage}}
Expand Down Expand Up @@ -95,4 +102,17 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Consu
}
return null;
}

{{~#if useEnterpriseEnvelope}}
protected Object unwrap(Object payload) {
if(envelopeUnWrapper != null) {
return envelopeUnWrapper.unwrap(payload);
}
return payload;
}

public interface EnvelopeUnWrapper {
public Object unwrap(Object payload);
}
{{/if}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,24 @@ public class {{apiClassName}} implements I{{apiClassName}} {
protected Logger log = LoggerFactory.getLogger(getClass());

protected StreamBridge streamBridge;
{{~#each operations as |operation|}}
public String {{operation.operationId}}BindingName = "{{operation.x--operationIdKebabCase}}-out{{bindingSuffix}}";
{{~/each}}

{{~#if useEnterpriseEnvelope}}
public EnvelopeWrapper envelopeWrapper;
{{~/if}}

public {{apiClassName}}(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}

{{~#each operations as |operation|}}
public String {{operation.operationId}}BindingName = "{{operation.x--operationIdKebabCase}}-out{{bindingSuffix}}";
{{/each}}
{{~#if useEnterpriseEnvelope}}
@Autowired(required = false)
public void setEnvelopeWrapper(EnvelopeWrapper envelopeWrapper) {
this.envelopeWrapper = envelopeWrapper;
}
{{~/if}}

{{~#each operations as |operation|}}
{{#each operation.x--messages as |message|}}
Expand All @@ -40,11 +50,23 @@ public class {{apiClassName}} implements I{{apiClassName}} {
*/
public boolean {{operation.operationId}}({{message.x--javaType}} payload, {{message.x--javaTypeSimpleName}}Headers headers) {
log.debug("Sending message to topic: {}", {{operation.operationId}}BindingName);
Message message = MessageBuilder.createMessage(payload, new MessageHeaders(headers));
Message message = MessageBuilder.createMessage({{#if (hasEnterpriseEnvelope operation)}}wrap(payload){{else}}payload{{/if}}, new MessageHeaders(headers));
return streamBridge.send({{operation.operationId}}BindingName, message);
}

{{/each}}
{{/each}}

{{~#if useEnterpriseEnvelope}}
protected Object wrap(Object payload) {
if(envelopeWrapper != null) {
return envelopeWrapper.wrap(payload);
}
return payload;
}

public interface EnvelopeWrapper {
public Object wrap(Object payload);
}
{{/if}}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ public void test_generator_provider_for_events() throws Exception {
Assertions.assertTrue(logs.contains("Writing template with targetFile: src/main/java/io/example/integration/test/api/provider_for_events/DefaultServiceEventsProducer.java"));
}

@Test
public void test_generator_provider_for_events_with_envelope() throws Exception {
Plugin plugin = new SpringCloudStream3Plugin()
.withSpecFile("classpath:io/zenwave360/generator/resources/asyncapi/asyncapi-events.yml")
.withTargetFolder("target/zenwave630/out")
.withOption("apiPackage", "io.example.integration.test.api.client_for_events_with_envelope")
.withOption("modelPackage", "io.example.integration.test.api.model")
.withOption("useEnterpriseEnvelope", true)
.withOption("operationIds", List.of("onProductCreated"))
.withOption("role", AsyncapiRoleType.client)
.withOption("style", ProgrammingStyle.imperative);

new MainGenerator().generate(plugin);

List<String> logs = logCaptor.getLogs();
Assertions.assertTrue(logs.contains("Writing template with targetFile: src/main/java/io/example/integration/test/api/client_for_events_with_envelope/OnProductCreatedConsumer.java"));
Assertions.assertTrue(logs.contains("Writing template with targetFile: src/main/java/io/example/integration/test/api/client_for_events_with_envelope/IOnProductCreatedConsumerService.java"));
}

@Test
public void test_generator_provider_for_commands_imperative() throws Exception {
Plugin plugin = new SpringCloudStream3Plugin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Map<String, List<Map<String, Object>>> getOperationsGroupedByTag(Model ap

public boolean matchesFilters(Map<String, Object> operation, AsyncapiOperationType operationType) {
var operationOperationType = AsyncapiOperationType.valueOf(operation.get("x--operationType").toString());
return operationOperationType == operationType && matchesBindingTypes(operation, bindingTypes);
return operationOperationType == operationType && matchesBindingTypes(operation, bindingTypes) && !isSkipOperation(operation);
}

/**
Expand Down Expand Up @@ -122,6 +122,14 @@ public boolean isProducer(Map<String, Object> operation) {
return isProducer(this.role, operationType);
}

public boolean isSkipOperation(Map<String, Object> operation) {
if(operationIds == null || operationIds.isEmpty()) {
return false;
}
return !operationIds.contains((String) operation.get("operationId"));
}


/**
* Returns true for a provider and operation is an event(operationType=publish) or a client and operation is a command(operationType=subscribe).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ channels:
publish:
summary: On new Product (Async Event)
operationId: onProductCreated
x-envelope-java-type: io.zenwave360.generator.plugins.envelope.Envelope
message:
$ref: "#/components/messages/createProductMsg"
bindings:
Expand Down

0 comments on commit 95b8dbb

Please sign in to comment.