diff --git a/jbpm/jbpm-tests/src/test/java/org/jbpm/bpmn2/ActivityTest.java b/jbpm/jbpm-tests/src/test/java/org/jbpm/bpmn2/ActivityTest.java index 2578981db0b..c6a71ea45bf 100755 --- a/jbpm/jbpm-tests/src/test/java/org/jbpm/bpmn2/ActivityTest.java +++ b/jbpm/jbpm-tests/src/test/java/org/jbpm/bpmn2/ActivityTest.java @@ -51,6 +51,10 @@ import org.jbpm.bpmn2.flow.MinimalWithDIGraphicalProcess; import org.jbpm.bpmn2.flow.MinimalWithGraphicalModel; import org.jbpm.bpmn2.flow.MinimalWithGraphicalProcess; +import org.jbpm.bpmn2.flow.ProcessCustomDescriptionMetaDataModel; +import org.jbpm.bpmn2.flow.ProcessCustomDescriptionMetaDataProcess; +import org.jbpm.bpmn2.flow.ProcessVariableCustomDescriptionMetaDataModel; +import org.jbpm.bpmn2.flow.ProcessVariableCustomDescriptionMetaDataProcess; import org.jbpm.bpmn2.flow.ProcessWithVariableNameModel; import org.jbpm.bpmn2.flow.ProcessWithVariableNameProcess; import org.jbpm.bpmn2.flow.UserTaskModel; @@ -86,8 +90,16 @@ import org.jbpm.bpmn2.subprocess.CallActivityMIProcess; import org.jbpm.bpmn2.subprocess.CallActivityModel; import org.jbpm.bpmn2.subprocess.CallActivityProcess; +import org.jbpm.bpmn2.subprocess.CallActivityProcessBoundaryErrorModel; +import org.jbpm.bpmn2.subprocess.CallActivityProcessBoundaryErrorProcess; +import org.jbpm.bpmn2.subprocess.CallActivitySubProcessBoundaryErrorModel; +import org.jbpm.bpmn2.subprocess.CallActivitySubProcessBoundaryErrorProcess; import org.jbpm.bpmn2.subprocess.CallActivitySubProcessModel; import org.jbpm.bpmn2.subprocess.CallActivitySubProcessProcess; +import org.jbpm.bpmn2.subprocess.CallActivitySubProcessWithBoundaryEventModel; +import org.jbpm.bpmn2.subprocess.CallActivitySubProcessWithBoundaryEventProcess; +import org.jbpm.bpmn2.subprocess.CallActivityWithBoundaryEventModel; +import org.jbpm.bpmn2.subprocess.CallActivityWithBoundaryEventProcess; import org.jbpm.bpmn2.subprocess.CallActivityWithIOexpressionModel; import org.jbpm.bpmn2.subprocess.CallActivityWithIOexpressionProcess; import org.jbpm.bpmn2.subprocess.InputMappingUsingValueModel; @@ -969,31 +981,26 @@ public void testScriptTaskWithVariableByName() throws Exception { } @Test - public void testCallActivityWithBoundaryEvent() throws Exception { - ProcessCompletedCountDownProcessEventListener countDownListener = new ProcessCompletedCountDownProcessEventListener(); - kruntime = createKogitoProcessRuntime( - "org/jbpm/bpmn2/subprocess/BPMN2-CallActivityWithBoundaryEvent.bpmn2", - "org/jbpm/bpmn2/subprocess/BPMN2-CallActivitySubProcessWithBoundaryEvent.bpmn2"); - kruntime.getProcessEventManager().addEventListener(countDownListener); - + public void testCallActivityWithBoundaryEvent() { + Application app = ProcessTestHelper.newApplication(); TestWorkItemHandler workItemHandler = new TestWorkItemHandler(); - kruntime.getKogitoWorkItemManager().registerWorkItemHandler("Human Task", - workItemHandler); - Map params = new HashMap<>(); - params.put("x", "oldValue"); - KogitoProcessInstance processInstance = kruntime.startProcess("CallActivityWithBoundaryEvent", params); - - countDownListener.waitTillCompleted(); - - assertProcessInstanceFinished(processInstance, kruntime); - // assertEquals("new timer value", - // ((WorkflowProcessInstance) processInstance).getVariable("y")); - // first check the parent process executed nodes - assertNodeTriggered(processInstance.getStringId(), "StartProcess", - "CallActivity", "Boundary event", "Script Task", "end"); - // then check child process executed nodes - is there better way to get child process id than simply increment? - assertNodeTriggered(processInstance.getStringId() + 1, "StartProcess2", - "User Task"); + ProcessTestHelper.registerHandler(app, "Human Task", workItemHandler); + ProcessCompletedCountDownProcessEventListener listener = new ProcessCompletedCountDownProcessEventListener(); + ProcessTestHelper.registerProcessEventListener(app, listener); + org.kie.kogito.process.Process callActivitySubProcessWithBoundaryEventProcess = CallActivitySubProcessWithBoundaryEventProcess.newProcess(app); + org.kie.kogito.process.Process process = CallActivityWithBoundaryEventProcess.newProcess(app); + CallActivityWithBoundaryEventModel model = process.createModel(); + model.setX("oldValue"); + ProcessInstance processInstance = process.createInstance(model); + processInstance.start(); + listener.waitTillCompleted(15000); + assertThat(processInstance).extracting(ProcessInstance::status).isEqualTo(ProcessInstance.STATE_COMPLETED); + Collection processNodes = process.findNodes(Objects::nonNull).stream().map(Node::getName).collect(Collectors.toSet()); + Collection subProcessNodes = callActivitySubProcessWithBoundaryEventProcess.findNodes(Objects::nonNull).stream().map(Node::getName).collect(Collectors.toSet()); + assertThat(processNodes.containsAll(List.of("StartProcess", + "CallActivity", "Boundary event", "Script Task", "end"))).isTrue(); + assertThat(subProcessNodes.containsAll(List.of("StartProcess2", + "User Task"))).isTrue(); } @Test @@ -1059,45 +1066,56 @@ public void testUserTaskWithSimData() { } @Test - public void testCallActivityWithBoundaryErrorEvent() throws Exception { - kruntime = createKogitoProcessRuntime( - "org/jbpm/bpmn2/subprocess/BPMN2-CallActivityProcessBoundaryError.bpmn2", - "org/jbpm/bpmn2/subprocess/BPMN2-CallActivitySubProcessBoundaryError.bpmn2"); - - kruntime.getKogitoWorkItemManager().registerWorkItemHandler("task1", - new SystemOutWorkItemHandler()); - KogitoProcessInstance processInstance = kruntime.startProcess("CallActivityProcessBoundaryError"); + public void testCallActivityWithBoundaryErrorEvent() { + Application app = ProcessTestHelper.newApplication(); + SystemOutWorkItemHandler workItemHandler = new SystemOutWorkItemHandler(); + ProcessTestHelper.registerHandler(app, "task1", workItemHandler); + ProcessCompletedCountDownProcessEventListener listener = new ProcessCompletedCountDownProcessEventListener(); + ProcessTestHelper.registerProcessEventListener(app, listener); + org.kie.kogito.process.Process callActivitySubProcessBoundaryErrorProcess = CallActivitySubProcessBoundaryErrorProcess.newProcess(app); + org.kie.kogito.process.Process process = CallActivityProcessBoundaryErrorProcess.newProcess(app); + CallActivityProcessBoundaryErrorModel model = process.createModel(); + ProcessInstance processInstance = process.createInstance(model); + processInstance.start(); + assertThat(processInstance).extracting(ProcessInstance::status).isEqualTo(ProcessInstance.STATE_COMPLETED); - assertProcessInstanceFinished(processInstance, kruntime); - assertNodeTriggered(processInstance.getStringId(), "StartProcess", - "Call Activity 1", "Boundary event", "Task Parent", "End2"); - // then check child process executed nodes - is there better way to get child process id than simply increment? - assertNodeTriggered(processInstance.getStringId() + 1, "StartProcess", "Task 1", "End"); + Collection processNodes = process.findNodes(Objects::nonNull).stream().map(Node::getName).collect(Collectors.toSet()); + Collection subProcessNodes = callActivitySubProcessBoundaryErrorProcess.findNodes(Objects::nonNull).stream() + .map(Node::getName).collect(Collectors.toSet()); + assertThat(processNodes.containsAll(List.of("StartProcess", + "Call Activity 1", "Boundary event", "Task Parent", "End2"))).isTrue(); + assertThat(subProcessNodes.containsAll(List.of("StartProcess", "Task 1", "End"))).isTrue(); } @Test - public void testCallActivityWithBoundaryErrorEventWithWaitState() throws Exception { - kruntime = createKogitoProcessRuntime( - "org/jbpm/bpmn2/subprocess/BPMN2-CallActivityProcessBoundaryError.bpmn2", - "org/jbpm/bpmn2/subprocess/BPMN2-CallActivitySubProcessBoundaryError.bpmn2"); - + public void testCallActivityWithBoundaryErrorEventWithWaitState() { + Application app = ProcessTestHelper.newApplication(); TestWorkItemHandler workItemHandler = new TestWorkItemHandler(); - kruntime.getKogitoWorkItemManager().registerWorkItemHandler("task1", workItemHandler); - KogitoProcessInstance processInstance = kruntime.startProcess("CallActivityProcessBoundaryError"); + ProcessTestHelper.registerHandler(app, "task1", workItemHandler); + + org.kie.kogito.process.Process callActivitySubProcessBoundaryErrorProcess = CallActivitySubProcessBoundaryErrorProcess.newProcess(app); + ProcessInstance subProcessInstance = callActivitySubProcessBoundaryErrorProcess + .createInstance(callActivitySubProcessBoundaryErrorProcess.createModel()); + org.kie.kogito.process.Process process = CallActivityProcessBoundaryErrorProcess.newProcess(app); + CallActivityProcessBoundaryErrorModel model = process.createModel(); + ProcessInstance processInstance = process.createInstance(model); + processInstance.start(); org.kie.kogito.internal.process.runtime.KogitoWorkItem workItem = workItemHandler.getWorkItem(); assertThat(workItem).isNotNull(); - kruntime.getKogitoWorkItemManager().completeWorkItem(workItem.getStringId(), null); + + subProcessInstance.completeWorkItem(workItem.getStringId(), Collections.emptyMap()); workItem = workItemHandler.getWorkItem(); assertThat(workItem).isNotNull(); - kruntime.getKogitoWorkItemManager().completeWorkItem(workItem.getStringId(), null); - - assertProcessInstanceFinished(processInstance, kruntime); - assertNodeTriggered(processInstance.getStringId(), "StartProcess", - "Call Activity 1", "Boundary event", "Task Parent", "End2"); - // then check child process executed nodes - is there better way to get child process id than simply increment? - assertNodeTriggered(processInstance.getStringId() + 1, "StartProcess", "Task 1", "End"); + processInstance.completeWorkItem(workItem.getStringId(), Collections.emptyMap()); + assertThat(processInstance).extracting(ProcessInstance::status).isEqualTo(ProcessInstance.STATE_COMPLETED); + Collection processNodes = process.findNodes(Objects::nonNull).stream().map(Node::getName).collect(Collectors.toSet()); + Collection subProcessNodes = callActivitySubProcessBoundaryErrorProcess.findNodes(Objects::nonNull).stream() + .map(Node::getName).collect(Collectors.toSet()); + assertThat(processNodes.containsAll(List.of("StartProcess", + "Call Activity 1", "Boundary event", "Task Parent", "End2"))).isTrue(); + assertThat(subProcessNodes.containsAll(List.of("StartProcess", "Task 1", "End"))).isTrue(); } @Test @@ -1246,28 +1264,27 @@ public void testErrorBetweenProcessesProcess() throws Exception { } @Test - public void testProcessCustomDescriptionMetaData() throws Exception { - kruntime = createKogitoProcessRuntime("org/jbpm/bpmn2/flow/BPMN2-ProcessCustomDescriptionMetaData.bpmn2"); - - Map params = new HashMap<>(); - - KogitoProcessInstance processInstance = kruntime.startProcess("ProcessCustomDescriptionMetaData", params); - assertProcessInstanceCompleted(processInstance); - - String description = processInstance.getDescription(); + public void testProcessCustomDescriptionMetaData() { + Application app = ProcessTestHelper.newApplication(); + org.kie.kogito.process.Process process = ProcessCustomDescriptionMetaDataProcess.newProcess(app); + ProcessCustomDescriptionMetaDataModel model = process.createModel(); + ProcessInstance processInstance = process.createInstance(model); + processInstance.start(); + assertThat(processInstance.status()).isEqualTo(ProcessInstance.STATE_COMPLETED); + String description = processInstance.description(); assertThat(description).isNotNull().isEqualTo("my process with description"); } @Test - public void testProcessVariableCustomDescriptionMetaData() throws Exception { - kruntime = createKogitoProcessRuntime("org/jbpm/bpmn2/flow/BPMN2-ProcessVariableCustomDescriptionMetaData.bpmn2"); - - Map params = new HashMap<>(); - params.put("x", "variable name for process"); - KogitoProcessInstance processInstance = kruntime.startProcess("ProcessVariableCustomDescriptionMetaData", params); - assertProcessInstanceCompleted(processInstance); - - String description = processInstance.getDescription(); + public void testProcessVariableCustomDescriptionMetaData() { + Application app = ProcessTestHelper.newApplication(); + org.kie.kogito.process.Process process = ProcessVariableCustomDescriptionMetaDataProcess.newProcess(app); + ProcessVariableCustomDescriptionMetaDataModel model = process.createModel(); + model.setX("variable name for process"); + ProcessInstance processInstance = process.createInstance(model); + processInstance.start(); + assertThat(processInstance.status()).isEqualTo(ProcessInstance.STATE_COMPLETED); + String description = processInstance.description(); assertThat(description).isNotNull().isEqualTo("variable name for process"); } diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java index 6c95879a018..7aa2b2eaf55 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-builder/src/main/java/org/kie/kogito/serverless/workflow/parser/handlers/EventHandler.java @@ -59,7 +59,6 @@ public MakeNodeResult makeNode(RuleFlowNodeContainerFactory factory) { handleErrors(factory, embeddedContainer); return new MakeNodeResult(embeddedContainer); } - } private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory factory, OnEvents onEvent) { @@ -67,6 +66,9 @@ private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory factory onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), getVarName(), (f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar))); CompositeContextNodeFactory embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions()); + if (isStartState) { + handleErrors(factory, embeddedSubProcess); + } connect(result.getOutgoingNode(), embeddedSubProcess); return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess); } diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties index 8b83e2da2f9..cb8cbdd9adf 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/application.properties @@ -51,6 +51,10 @@ quarkus.rest-client.enum-parameter_yaml.url=${enum-echo-service-mock.url} # Error handling properties kogito.sw.functions.publishPerfectSquare.host=localhost + +mp.messaging.incoming.start.connector=quarkus-http +mp.messaging.incoming.start.path=/startWithError + mp.messaging.incoming.move.connector=quarkus-http mp.messaging.incoming.move.path=/move diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventStartWithError.sw.json b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventStartWithError.sw.json new file mode 100644 index 00000000000..e1e87032bee --- /dev/null +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/eventStartWithError.sw.json @@ -0,0 +1,76 @@ +{ + "id": "startEventError", + "version": "1.0", + "name": "Workflow event test", + "description": "An test of a starting event with error on action", + "start": "waitForEvent", + "events": [ + { + "name": "startEvent", + "source": "", + "type": "start" + } + ], + "errors": [ + { + "name": "odd number", + "code": "Odd situation" + } + ], + "functions": [ + { + "name": "publishEvenError", + "type": "asyncapi", + "operation": "specs/callbackResults.yaml#sendEvenError" + }, + { + "name": "isEven", + "type": "custom", + "operation": "service:java:org.kie.kogito.workflows.services.EvenService::isEven" + } + ] + , + "states": [ + { + "name": "waitForEvent", + "type": "event", + "onEvents": [ + { + "eventRefs": [ + "startEvent" + ], + "actions": [ + { + "name": "actionWithError", + "functionRef": { + "refName": "isEven", + "arguments": { + "number": ".number" + } + } + } + ] + + } + ], + "onErrors": [ + { + "errorRef": "odd number", + "transition": "PublishError" + } + ], + "end":true + }, + { + "name": "PublishError", + "type": "operation", + "actions": [ + { + "name": "publishEvenError", + "functionRef": "publishEvenError" + } + ], + "end": "true" + } + ] +} \ No newline at end of file diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml index be2797bae7c..7b7b97d2776 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/main/resources/specs/callbackResults.yaml @@ -54,6 +54,13 @@ channels: summary: Timeout Expired message: $ref: '#/components/messages/message' + sendEvenError: + description: A message channel for publishing errors + publish: + operationId: sendEvenError + summary: Reporting error + message: + $ref: '#/components/messages/message' error: description: A message channel for failed executions publish: diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java index bd96c501170..dceacae493b 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventFlowIT.java @@ -62,16 +62,16 @@ static void init() { } @Test - void testStartingEventWithToStateFilter() { + void testStartingEventWithToStateFilter() throws IOException { given() .contentType(ContentType.JSON) .when() - .body(CloudEventBuilder.v1() + .body(defaultMarshaller.marshall(CloudEventBuilder.v1() .withId(UUID.randomUUID().toString()) .withSource(URI.create("customer-arrival-event-source")) .withType("customer-arrival-type") .withTime(OffsetDateTime.now()) - .withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("customer", Map.of("name", "pepe")))).build()) + .withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("customer", Map.of("name", "pepe")))).build())) .post("/eventWithToStateFilter") .then() .statusCode(202); diff --git a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java index 544b016549d..4121ca04805 100644 --- a/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java +++ b/quarkus/extensions/kogito-quarkus-serverless-workflow-extension/kogito-quarkus-serverless-workflow-integration-test/src/test/java/org/kie/kogito/quarkus/workflows/EventTimedoutIT.java @@ -19,7 +19,11 @@ package org.kie.kogito.quarkus.workflows; import java.io.IOException; +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Collections; import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -28,8 +32,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.kie.kogito.event.CloudEventMarshaller; import org.kie.kogito.event.Converter; import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants; +import org.kie.kogito.event.impl.ByteArrayCloudEventMarshaller; import org.kie.kogito.event.impl.ByteArrayCloudEventUnmarshallerFactory; import org.kie.kogito.test.quarkus.QuarkusTestProperty; import org.kie.kogito.test.quarkus.kafka.KafkaTypedTestClient; @@ -41,10 +47,14 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.jackson.JsonFormat; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.restassured.RestAssured; +import io.restassured.http.ContentType; +import static io.restassured.RestAssured.given; import static org.assertj.core.api.Assertions.assertThat; import static org.kie.kogito.quarkus.workflows.AssuredTestUtils.startProcess; @@ -59,13 +69,17 @@ public class EventTimedoutIT { private ObjectMapper objectMapper; private KafkaTypedTestClient kafkaClient; + private static CloudEventMarshaller defaultMarshaller; + @BeforeEach void setup() { + RestAssured.enableLoggingOfRequestAndResponseIfValidationFails(); kafkaClient = new KafkaTypedTestClient<>(kafkaBootstrapServers, ByteArraySerializer.class, ByteArrayDeserializer.class); objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()) .registerModule(JsonFormat.getCloudEventJacksonModule()) .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + defaultMarshaller = new ByteArrayCloudEventMarshaller(objectMapper); } @AfterEach @@ -93,4 +107,26 @@ void testTimedout() throws InterruptedException { countDownLatch.await(10, TimeUnit.SECONDS); assertThat(countDownLatch.getCount()).isZero(); } + + @Test + void testStartEventWithError() throws InterruptedException, IOException { + given() + .contentType(ContentType.JSON) + .when() + .body(defaultMarshaller.marshall(CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("source")) + .withType("start") + .withTime(OffsetDateTime.now()) + .withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("number", 3))).build())) + .post("/startWithError") + .then() + .statusCode(202); + final CountDownLatch countDownLatch = new CountDownLatch(1); + kafkaClient.consume("sendEvenError", v -> { + countDownLatch.countDown(); + }); + countDownLatch.await(10, TimeUnit.SECONDS); + assertThat(countDownLatch.getCount()).isZero(); + } }