Skip to content

Commit

Permalink
feat: SpringCloudStreams3Generator adds Dead Letter Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ivangsa committed Dec 6, 2022
1 parent 6e1ebd7 commit 282a102
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package {{apiPackage}};

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import org.slf4j.Logger;
Expand All @@ -8,6 +10,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

Expand All @@ -25,18 +28,19 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Consu
protected {{serviceName operation.x--operationIdCamelCase}} service;
protected StreamBridge streamBridge;

protected String deadLetterQueue;
protected Map<Class<? extends Exception>, String> errorQueueMap;

public {{consumerName operation.x--operationIdCamelCase}}({{serviceName operation.x--operationIdCamelCase}} service, @Autowired(required=false) StreamBridge streamBridge) {
this.service = service;
this.streamBridge = streamBridge;
}

@Value("${spring.cloud.stream.bindings.{{operation.x--operationIdKebabCase}}-in-0.dead-letter-queue-binding-name:null}")
public void setDeadLetterQueue(String deadLetterQueue) {
this.deadLetterQueue = deadLetterQueue;
@Value("#{${spring.cloud.stream.bindings.on-customer-event-in-0.dead-letter-queue-error-map:{:}}}")
public void setErrorQueueMap(Map<Class<? extends Exception>, String> errorQueueMap) {
this.errorQueueMap = errorQueueMap;
}


@Override
public void accept(Message<{{messageType operation}}> message) {
log.debug("Received message: {}", message);
Expand All @@ -56,16 +60,39 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Consu
{{~/each}}
log.warn("Received message without any business handler: [payload: {}, message: {}]", payload.getClass().getName(), message);
} catch (Exception e) {
log.error("Error processing message: {}", message, e);
if (streamBridge != null && deadLetterQueue != null) {
log.debug("Sending message to dead letter queue: {}", deadLetterQueue);
if(log.isDebugEnabled()) {
log.error("Error processing message: {}", message, e);
} else {
log.error("Error processing message: {}", message);
}

String resolvedDLQ = resolveDeadLetterQueue(e, message);
if (streamBridge != null && resolvedDLQ != null) {
try {
streamBridge.send(deadLetterQueue, MessageBuilder.fromMessage(message).build());
log.debug("Sending message to dead letter queue: {}", resolvedDLQ);
var headers = new HashMap(message.getHeaders());
headers.put("x-exception-type", e.getClass().getName());
headers.put("x-exception-message", e.getMessage());
headers.put("x-exception-stacktrace ", e.getStackTrace());
headers.put("x-exception-payload-type", message.getPayload().getClass());
streamBridge.send(resolvedDLQ, MessageBuilder.createMessage(message.getPayload(), new MessageHeaders(headers)));
return;
} catch (Exception e1) {
log.error("Error sending message to dead letter queue: {}", deadLetterQueue, e1);
log.error("Error sending message to dead letter queue: {}", resolvedDLQ, e1);
}
}
throw e;
}
}

protected String resolveDeadLetterQueue(Exception e, Message message) {
if(errorQueueMap != null) {
for (Map.Entry<Class<? extends Exception>, String> entry : errorQueueMap.entrySet()) {
if(entry.getKey().isAssignableFrom(e.getClass())) {
return entry.getValue();
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package {{apiPackage}};

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Function;
import java.util.function.Consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
{{#if exposeMessage}}
Expand All @@ -18,7 +18,7 @@ import {{modelPackage}}.*;
{{/if}}

@Component("{{operation.x--operationIdKebabCase}}")
public class {{consumerName operation.x--operationIdCamelCase}} implements Function<Flux<Message<{{messageType operation}}>>, Mono<Void>> {
public class {{consumerName operation.x--operationIdCamelCase}} implements Consumer<Flux<Message<{{messageType operation}}>>> {

protected Logger log = LoggerFactory.getLogger(getClass());

Expand All @@ -30,9 +30,9 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Funct


@Override
public Mono<Void> apply(Flux<Message<{{messageType operation}}>> messageFlux) {
log.debug("Received message: {}", messageFlux);
return messageFlux.flatMap(m -> {
public void accept(Flux<Message<{{messageType operation}}>> messageFlux) {
messageFlux.subscribe(m -> {
log.debug("Received message: {}", m);
{{#each operation.x--messages as |message|}}
if(m.getPayload() instanceof {{message.x--javaType}}) {
{{~#if exposeMessage}}
Expand All @@ -42,7 +42,6 @@ public class {{consumerName operation.x--operationIdCamelCase}} implements Funct
{{~/if}}
}
{{/each}}
return Mono.empty();
}).then();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public interface {{serviceName operation.x--operationIdCamelCase}} {
{{~> (lookup . 'headersPartial') isProducer=false}}

{{~#if exposeMessage}}
default Mono<Void> {{operation.operationId}}{{methodSuffix message operation}}(Flux<Message<{{message.x--javaType}}>> msg) { return Mono.empty(); };
default void {{operation.operationId}}{{methodSuffix message operation}}(Flux<Message<{{message.x--javaType}}>> messageFlux) { };
{{~else}}
default Mono<Void> {{operation.operationId}}{{methodSuffix message operation}}(Flux<{{message.x--javaType}}> msg) { return Mono.empty(); };
default void {{operation.operationId}}{{methodSuffix message operation}}(Flux<{{message.x--javaType}}> messageFlux) { };
{{~/if}}
{{/each}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package {{apiPackage}};

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
Expand All @@ -27,43 +28,36 @@ public class {{apiClassName}} implements I{{apiClassName}} {

private Logger log = LoggerFactory.getLogger(getClass());

public String sqlSaveMessageToOutbox = "INSERT INTO {tableName} (binding_name, type, payload, headers) VALUES (?, ?, ?, ?)";
{{~#each operations as |operation|}}
public String {{operation.operationId}}BindingName = "{{operation.x--operationIdKebabCase}}-out{{bindingSuffix}}";
public String {{operation.operationId}}OutboxTableName = "{{snakeCase operation.operationId}}_outbox";
{{~/each}}

protected StreamBridge streamBridge;
protected JdbcTemplate jdbcTemplate;

protected ObjectMapper objectMapper = new ObjectMapper();

public {{apiClassName}}(StreamBridge streamBridge, JdbcTemplate jdbcTemplate) {
this.streamBridge = streamBridge;
this.jdbcTemplate = jdbcTemplate;
}

{{~#each operations as |operation|}}
public String {{operation.operationId}}BindingName = "{{operation.x--operationIdKebabCase}}-out{{bindingSuffix}}";
public String {{operation.operationId}}OutboxTableName = "{{operation.x--operationIdKebabCase}}-outbox";
{{/each}}
public String sqlSaveMessageToOutbox = "INSERT INTO {tableName} (payload, headers) VALUES (?, ?)";
public String sqlUpdateSentOutbox = "UPDATE {tableName} SET sent = true WHERE id = ?";

public void sendOutboxMessage(String id, String payloadJson, String headersJson, String tableName, String bindingName) {
public void sendOutboxMessage(Map<String, Object> outboxTableRow) throws Exception {
String bindingName = (String) outboxTableRow.get("binding_name");
log.debug("Sending outbox message to stream {}", bindingName);
try {
var payload = objectMapper.readValue(payloadJson, Map.class);
var headers = objectMapper.readValue(headersJson, Map.class);
Message message = MessageBuilder.createMessage(payload, new MessageHeaders(headers));
streamBridge.send(bindingName, message);
String sql = sqlSaveMessageToOutbox.replace("{tableName}", tableName);
jdbcTemplate.update(sql, id);
} catch (JsonProcessingException e) {
log.error("Error unmarshalling outbox messages [payload= {}, headers= , binding={}]", payloadJson, headersJson, bindingName, e);
} catch (DataAccessException dataAccessException) {
log.error("Error updating outbox while sending message [table={}]", tableName, dataAccessException);
}

var type = (String) outboxTableRow.get("type");
var payload = fromColumnData(outboxTableRow.get("payload"), type);
var headers = fromHeadersColumn((String) outboxTableRow.get("headers"));
Message message = MessageBuilder.createMessage(payload, new MessageHeaders(headers));
streamBridge.send(bindingName, message);
}

protected boolean saveMessageToOutbox(Message message, String tableName) {
protected boolean saveMessageToOutbox(Message message, String tableName, String bindingName) {
log.debug("Saving message to outbox tableName {}", tableName);
String sql = sqlSaveMessageToOutbox.replace("{tableName}", tableName);
return jdbcTemplate.update(sql, asString(message.getPayload()), asString(message.getHeaders())) == 1;
return jdbcTemplate.update(sql, bindingName, message.getPayload().getClass().getName(), toColumnData(message.getPayload()), toHeadersColumn(message.getHeaders())) == 1;
}


Expand All @@ -75,22 +69,45 @@ public class {{apiClassName}} implements I{{apiClassName}} {
public boolean {{operation.operationId}}({{message.x--javaType}} payload, {{message.x--javaTypeSimpleName}}Headers headers) {
log.debug("Sending message to stream {} via outbox", {{operation.operationId}}BindingName);
Message message = MessageBuilder.createMessage(payload, new MessageHeaders(headers));
//return streamBridge.send(onCustomerEventBindingName, message);
return saveMessageToOutbox(message, {{operation.operationId}}OutboxTableName);
//return streamBridge.send({{operation.operationId}}BindingName, message);
return saveMessageToOutbox(message, {{operation.operationId}}OutboxTableName, {{operation.operationId}}BindingName);
}

{{/each}}
{{/each}}

protected String asString(Object value) {
protected Map fromHeadersColumn(String value) throws Exception {
return objectMapper.readValue(value, Map.class);
}

protected String toHeadersColumn(Map headers) {
try {
return objectMapper.writeValueAsString(headers);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

protected Object fromColumnData(Object value, String type) throws Exception {
return objectMapper.readValue((String) value, Class.forName(type));
}

protected Object toColumnData(Object value) {
if (value == null) {
return null;
}
try {
objectMapper.addMixIn(Object.class, IgnoreAvroSchemaPropertyMixIn.class);
return objectMapper.writeValueAsString(value);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

abstract class IgnoreAvroSchemaPropertyMixIn {
@JsonIgnore
abstract void getSpecificData();
@JsonIgnore
abstract void getSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public static boolean eq(String first, Options options) throws IOException {
return StringUtils.equals(first, second);
}

public static boolean startsWith(String first, Options options) throws IOException {
String second = options.param(0);
return StringUtils.startsWith(first, second);
}

public static boolean not(Object value, Options options) throws IOException {
if (value == null) {
return true;
Expand Down

0 comments on commit 282a102

Please sign in to comment.