Skip to content

Commit

Permalink
Merge pull request #31 from ZenWave360/develop
Browse files Browse the repository at this point in the history
Mergin develop for version 1.5.3
  • Loading branch information
ivangsa authored May 25, 2024
2 parents 308ab7e + 1d8f838 commit becf53a
Show file tree
Hide file tree
Showing 32 changed files with 415 additions and 251 deletions.
61 changes: 31 additions & 30 deletions plugins/asyncapi-spring-cloud-streams3/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public enum TransactionalOutboxType {
none, mongodb, jdbc
}

public enum FunctionalInterfaceImplementation {
consumer, function
}

public String sourceProperty = "api";

@DocumentedOption(description = "Programming style")
Expand Down Expand Up @@ -68,6 +72,9 @@ public enum TransactionalOutboxType {
@DocumentedOption(description = "Spring-Boot binding suffix. It will be appended to the operation name kebab-cased. E.g. <operation-id>-in-0")
public String bindingSuffix = "-0";

@DocumentedOption(description = "Whether to use a 'java.util.function.Consumer' or 'java.util.function.Function' as the functional interface implementation.")
public FunctionalInterfaceImplementation functionalInterfaceImplementation = FunctionalInterfaceImplementation.consumer;

@DocumentedOption(description = "AsyncAPI extension property name for runtime auto-configuration of headers.")
public String runtimeHeadersProperty = "x-runtime-expression";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
Expand All @@ -24,7 +25,13 @@ import {{modelPackage}}.*;

@Component("{{bindingPrefix}}{{operation.x--operationIdKebabCase}}")
@jakarta.annotation.Generated(value = "io.zenwave360.sdk.plugins.SpringCloudStreams3Plugin", date = "{{date}}")
public class {{consumerName operation.x--operationIdCamelCase}} implements Function<Message<{{messageType operation}}>, Void> {
public class {{consumerName operation.x--operationIdCamelCase}} implements
{{~#if (eq functionalInterfaceImplementation 'function')~}}
Function<Message<{{messageType operation}}>, Void>
{{~else~}}
Consumer<Message<{{messageType operation}}>>
{{~/if~}}
{

protected Logger log = LoggerFactory.getLogger(getClass());

Expand All @@ -51,7 +58,13 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Funct
}
{{~/if}}

@Override
// NOTE: implementing both accept and apply facilitates code generation.
// See what this class actually implements.

public void accept(Message<{{messageType operation}}> message) {
apply(message);
}

public Void apply(Message<{{messageType operation}}> message) {
log.debug("Received message: {}", message);
try {
Expand Down Expand Up @@ -86,12 +99,14 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Funct
if (streamBridge != null && resolvedDLQ != null) {
try {
log.debug("Sending message to dead letter queue: {}", resolvedDLQ);
Object payload = message.getPayload();
var headers = new HashMap(message.getHeaders());
headers.put("x-original-headers", message.getHeaders().toString());
headers.put("x-exception-type", e.getClass().getName());
headers.put("x-exception-message", e.getMessage());
headers.put("x-exception-stacktrace ", getStackTraceAsString(e));
headers.put("x-exception-payload-type", message.getPayload().getClass().getName());
streamBridge.send(resolvedDLQ, MessageBuilder.createMessage(message.getPayload(), new MessageHeaders(headers)));
headers.put("x-exception-payload-type", payload.getClass().getName());
streamBridge.send(resolvedDLQ, MessageBuilder.createMessage(payload, new MessageHeaders(headers)));
return null;
} catch (Exception e1) {
log.error("Error sending message to dead letter queue: {}", resolvedDLQ, e1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ public interface {{serviceInterfaceName operation.x--operationIdCamelCase}} {
* Default method for handling unknown messages or tombstone records (null record values).
*/
{{~#if exposeMessage}}
default void defaultHandler(Message msg) {};
default void defaultHandler(Message msg) {
var payload = msg.getPayload();
throw new UnsupportedOperationException("Payload type not supported: " + (payload != null? payload.getClass().getName() : null));
};
{{~else}}
default void defaultHandler(Object payload, Map<String, Object> headers) {};
default void defaultHandler(Object payload, Map<String, Object> headers) {
throw new UnsupportedOperationException("Payload type not supported: " + (payload != null? payload.getClass().getName() : null));
};
{{~/if}}

{{#each messages as |message|}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ public Collection<String> findAggregateInputs(Map aggregate, Options options) {
return new HashSet<>(JSONPath.get(aggregate, "$.commands[*].parameter", List.of()));
}

public Collection<Map> findAggregates(Collection<Map> entities, Options options) {
return entities.stream().filter(entity -> isAggregate((String) entity.get("name"), options)).collect(Collectors.toList());
}

public boolean isAggregate(String entityName, Options options) {
var zdl = options.get("zdl");
var isAggregateRoot = JSONPath.get(zdl, "$.entities." + entityName + "[?(@.options.aggregate == true)]", List.of());
var aggregateName = findEntityAggregate(entityName, options);
return !isAggregateRoot.isEmpty() || aggregateName != null;
}

public String findEntityAggregate(String entityName, Options options) {
var zdl = options.get("zdl");
var aggregateNames = JSONPath.get(zdl, "$.aggregates[*][?(@.aggregateRoot == '" + entityName + "')].name", List.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ protected ZDLProjectTemplates configureProjectTemplates() {
"{{asPackageFolder configPackage}}/TestDataLoader.java", JAVA, null, true);
ts.addTemplate(ts.singleTemplates, "src/test/java", "config/DockerComposeInitializer-{{persistence}}.java", "{{mavenModulesPrefix}}-core-impl",
"{{asPackageFolder configPackage}}/DockerComposeInitializer.java", JAVA, null, true);
ts.addTemplate(ts.singleTemplates, "src/test/java", "config/TestDataLoader-{{persistence}}.java", "{{mavenModulesPrefix}}-infra",
"{{asPackageFolder configPackage}}/TestDataLoader.java", JAVA, null, true);
ts.addTemplate(ts.singleTemplates, "src/test/java", "config/DockerComposeInitializer-{{persistence}}.java", "{{mavenModulesPrefix}}-infra",
"{{asPackageFolder configPackage}}/DockerComposeInitializer.java", JAVA, null, true);

ts.addTemplate(ts.singleTemplates, "src/main/java", "core/inbound/dtos/package-info.java", "{{mavenModulesPrefix}}-domain",
"{{asPackageFolder inboundDtosPackage}}/package-info.java", JAVA, null, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ public {{abstractClass entity}} class {{entity.className}} {{addExtends entity}}
{{~#if (eq relationship.type 'OneToOne')}}
public {{entity.className}} set{{capitalize relationship.fieldName}}({{{relationshipFieldType relationship}}} {{relationship.fieldName}}) {
this.{{relationship.fieldName}} = {{relationship.fieldName}};
{{relationship.fieldName}}.set{{capitalize relationship.otherEntityFieldName}}(this);
if ({{relationship.fieldName}} != null) {
{{relationship.fieldName}}.set{{capitalize relationship.otherEntityFieldName}}(this);
}
return this;
}
{{~/if}}
Expand All @@ -119,20 +121,35 @@ public {{abstractClass entity}} class {{entity.className}} {{addExtends entity}}
{{relationship.fieldName}}.set{{capitalize relationship.otherEntityFieldName}}(this);
return this;
}
public {{entity.className}} remove{{capitalize relationship.fieldName}}({{relationship.otherEntityName}} {{relationship.fieldName}}) {
this.{{relationship.fieldName}}.remove({{relationship.fieldName}});
{{relationship.fieldName}}.set{{capitalize relationship.otherEntityFieldName}}(null);
return this;
}
{{~/if}}
{{~#if (eq relationship.type 'ManyToMany')}}
public {{entity.className}} add{{capitalize relationship.fieldName}}({{relationship.otherEntityName}} {{relationship.fieldName}}) {
this.{{relationship.fieldName}}.add({{relationship.fieldName}});
{{relationship.fieldName}}.get{{capitalize relationship.otherEntityFieldName}}().add(this);
return this;
}
public {{entity.className}} remove{{capitalize relationship.fieldName}}({{relationship.otherEntityName}} {{relationship.fieldName}}) {
this.{{relationship.fieldName}}.remove({{relationship.fieldName}});
{{relationship.fieldName}}.get{{capitalize relationship.otherEntityFieldName}}().remove(this);
return this;
}
{{~/if}}
{{~#if (and (eq relationship.type 'ManyToOne') relationship.otherEntityFieldName)}}
public {{entity.className}} set{{capitalize relationship.fieldName}}({{{relationshipFieldType relationship}}} {{relationship.fieldName}}) {
this.{{relationship.fieldName}} = {{relationship.fieldName}};
{{relationship.fieldName}}.get{{capitalize relationship.otherEntityFieldName}}().add(this);
return this;
}
public {{entity.className}} remove{{capitalize relationship.fieldName}}({{{relationshipFieldType relationship}}} {{relationship.fieldName}}) {
this.{{relationship.fieldName}} = {{relationship.fieldName}};
{{relationship.fieldName}}.get{{capitalize relationship.otherEntityFieldName}}().remove(this);
return this;
}
{{~/if}}
{{/each}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@
// TODO: you may need to reload the entity here to fetch relationships 'mapped by id'
{{~> (partial '../withEvents')}}
return {{wrapWithMapper entity}};
{{~else if (isCrudMethod 'update' method=method entity=entity )}}
log.debug("[CRUD] Request to update {{entity.className}}: {}", input);
var {{entity.instanceName}} = {{entity.instanceName}}Repository.findById(id);
// saving is unnecessary: https://vladmihalcea.com/best-spring-data-jparepository/
{{entity.instanceName}} = {{entity.instanceName}}.map(existing{{entity.instanceName}} -> {{asInstanceName service.name}}Mapper.update(existing{{entity.instanceName}}, {{{mapperInputCallSignature method.parameter}}}));
{{~> (partial '../withEvents')}}
return {{wrapWithMapper entity}};
{{~else if (isCrudMethod 'list' method=method entity=entity )}}
{{~#if method.options.paginated}}
log.debug("[CRUD] Request list of {{entity.classNamePlural}}: {}", pageable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
var {{entity.instanceName}} = {{entity.instanceName}}Repository.findById(id).map(existing{{entity.className}} -> {
return {{asInstanceName service.name}}Mapper.update(existing{{entity.className}}, {{{mapperInputCallSignature method.parameter}}});
})
// saving is unnecessary https://vladmihalcea.com/best-spring-data-jparepository/
// .map({{entity.instanceName}}Repository::save)
.map({{entity.instanceName}}Repository::save)
{{~#unless (eq entity.name method.returnType)}}
.map({{asInstanceName service.name}}Mapper::as{{returnType}})
{{~/unless}}
Expand All @@ -30,8 +29,7 @@
var {{entity.instanceName}} = {{entity.instanceName}}Repository.findById(id).map(existing{{entity.className}} -> {
return {{asInstanceName service.name}}Mapper.update(existing{{entity.className}}, {{{mapperInputCallSignature method.parameter}}});
})
// saving is unnecessary https://vladmihalcea.com/best-spring-data-jparepository/
// .map({{entity.instanceName}}Repository::save)
.map({{entity.instanceName}}Repository::save)
.orElseThrow();
{{~> (partial '../withEvents')}}
return {{wrapWithMapper entity}};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package {{basePackage}}.config;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -36,64 +38,86 @@ public class DockerComposeInitializer implements ApplicationContextInitializer<C

}

// The number of services in the docker-compose.yml file
private static final int NUMBER_OF_SERVICES = 1;

static String HOST = DockerClientFactory.instance().dockerHostIpAddress();
static DockerComposeContainer container = new DockerComposeContainer(new File("src/main/docker/docker-compose.yml"))
.withEnv("HOST", HOST)
.withExposedService("mongodb", 27017, Wait.forListeningPort())
;
static boolean isContainerRunning = false;
private record Service(String name, int port, String envVar, String envValueTemplate) {}

private static final String DOCKER_COMPOSE_FILE = "./docker-compose.yml";
private static final List<Service> SERVICES = List.of(
new Service("postgresql", 5432, "DATASOURCE_URL", "jdbc:postgresql://%s:%s/DATABASENAME"),
new Service("kafka", 9092, "KAFKA_BOOTSTRAP_SERVERS", "%s:%s")
);

static String HOST = DockerClientFactory.instance().dockerHostIpAddress();
static DockerComposeContainer container = new DockerComposeContainer(new File(DOCKER_COMPOSE_FILE)).withEnv("HOST", HOST);
static {
for (Service service : SERVICES) {
if("schema-registry".equals(service.name)) {
container.withExposedService(service.name, service.port, Wait.forHttp("/subjects").forStatusCode(200));
}
else {
container.withExposedService(service.name, service.port, Wait.forListeningPort());
}
}
}
static boolean isContainerRunning = false;

@SneakyThrows
@Override
public void initialize(ConfigurableApplicationContext ctx) {
if(isDockerComposeRunningAllServices(NUMBER_OF_SERVICES)) {
if(isDockerComposeRunningAllServices(SERVICES)) {
log.info("Docker Compose Containers are running from local docker-compose. Skipping TestContainers...");
} else {
log.info("Docker Compose Containers are not running from local docker-compose. Starting from TestContainers...");
if (isContainerRunning) {
log.info("Docker Compose Containers are already running from TestContainers. Skipping...");
} else {
log.info("Starting Docker Compose Containers from TestContainers...");
container.start();
isContainerRunning = true;

// TODO: Replace with JPA...
int mongodbPort = container.getServicePort("mongodb", 27017);
log.info("Docker Compose Containers are running from TestContainers. Mongodb: {}", HOST + ":" + mongodbPort);

log.info("Container Ports Status: Mongodb: {}", isPortOpen(HOST, mongodbPort));

TestPropertyValues.of(
String.format("MONGODB_URI=mongodb://%s:%s/REVIEW?replicaSet=rs0", HOST, mongodbPort)
).applyTo(ctx.getEnvironment());
log.info("Starting Docker Compose Containers from TestContainers...");
container.start();
isContainerRunning = true;

for (Service service : SERVICES) {
int port = container.getServicePort(service.name, service.port);
log.info("DockerCompose exposed port for {}: {}", service.name, HOST + ":" + port);
log.info("DockerCompose Service {} listening: {}", service.name, isPortOpen(HOST, port));
if (service.envValueTemplate != null) {
TestPropertyValues.of(service.envVar + "=" +String.format(service.envValueTemplate, HOST, port))
.applyTo(ctx.getEnvironment());
}
}
}
}
}

private boolean isDockerComposeRunningAllServices(int numberOfServices) {
return Stream.of("docker-compose", "docker-compose.exe").anyMatch(cmd -> {
try {
return readProcessOutputStream(cmd, "-f", "src/main/docker/docker-compose.yml", "ps").size() == (numberOfServices + 1);
} catch (IOException | InterruptedException e) {
return false;
}
});
}

private List<String> readProcessOutputStream(String ...command) throws IOException, InterruptedException {
var process = new ProcessBuilder(command).start();
var reader = new java.io.BufferedReader(new java.io.InputStreamReader(process.getInputStream()));
var line = "";
var output = new ArrayList<String>();
while ((line = reader.readLine()) != null) {
output.add(line);
}
process.waitFor();
return output;
}
private boolean isDockerComposeRunningAllServices(List<Service> services) {
var serviceNames = services.stream().map(Service::name).toList();
return Stream.of("docker-compose", "docker-compose.exe").anyMatch(cmd -> {
try {
return getDockerComposeRunningServices(cmd, "-f", DOCKER_COMPOSE_FILE, "ps").containsAll(serviceNames);
}
catch (IOException | InterruptedException e) {
return false;
}
});
}

private static final int SERVICE_COLUMN = 3;
public List<String> getDockerComposeRunningServices(String... command) throws IOException, InterruptedException {
List<String> services = new ArrayList<>();
var process = new ProcessBuilder(command).start();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));

String line;
while ((line = reader.readLine()) != null) {
if (!line.isEmpty()) {
String[] columns = line.split("\\s+");
if (columns.length > SERVICE_COLUMN) {
services.add(columns[SERVICE_COLUMN]);
}
}
}

process.waitFor();
return services.size() > 1? services.subList(1, services.size()) : services;
}

boolean isPortOpen(String host, int port) {
try (Socket socket = new Socket(host, port)) {
Expand Down
Loading

0 comments on commit becf53a

Please sign in to comment.