Skip to content

Commit

Permalink
Create cursor if it does not exist also use start and end dates passe…
Browse files Browse the repository at this point in the history
…d in the composition request
  • Loading branch information
shafiquech committed Oct 31, 2023
1 parent f895a3b commit b8fd2ad
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 8 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.

## [3.66.0](https://github.com/Backbase/stream-services/compare/3.65.0...3.66.0)
### Changed
- Always check cursor and create if it does not exist.
- If `DateRangeEnd` is passed in the composition request set that as lastTxnDate instead of system date.

## [3.65.0](https://github.com/Backbase/stream-services/compare/3.64.0...3.65.0)
### Changed
- Move call to processAudiencesSegmentation after setupUsers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Mono<TransactionIngestResponse> ingestPull(TransactionIngestPullRequest i
.map(f -> filterExisting(f, ingestPullRequest.getLastIngestedExternalIds()))
.flatMap(this::sendToDbs)
.doOnSuccess(list -> handleSuccess(
ingestPullRequest.getArrangementId(), true, list))
ingestPullRequest.getArrangementId(), true, list, ingestPullRequest.getDateRangeEnd()))
.onErrorResume(e -> handleError(
ingestPullRequest.getArrangementId(), true, e))
.map(list -> buildResponse(list, ingestPullRequest));
Expand Down Expand Up @@ -88,7 +88,7 @@ private Mono<TransactionIngestPullRequest> buildIntegrationRequest(TransactionIn
transactionIngestPullRequest.setDateRangeEnd(dateRangeEndFromRequest == null
? currentTime : dateRangeEndFromRequest);

if (dateRangeStartFromRequest == null && config.isCursorEnabled()) {
if (config.isCursorEnabled()) {
log.info("Transaction Cursor is enabled and Request has no Start Date");
return getCursor(transactionIngestPullRequest)
.switchIfEmpty(createCursor(transactionIngestPullRequest));
Expand All @@ -107,7 +107,7 @@ public Mono<TransactionIngestResponse> ingestPush(TransactionIngestPushRequest i
return Mono.just(Flux.fromIterable(ingestPushRequest.getTransactions()))
.flatMap(this::sendToDbs)
.doOnSuccess(list -> handleSuccess(
ingestPushRequest.getArrangementId(), false, list))
ingestPushRequest.getArrangementId(), false, list, null))
.onErrorResume(e -> handleError(
ingestPushRequest.getArrangementId(), false, e))
.map(list -> buildResponse(list, ingestPushRequest));
Expand Down Expand Up @@ -222,7 +222,7 @@ private TransactionIngestResponse buildResponse(List<TransactionsPostResponseBod
}

private void handleSuccess(String arrangementId, boolean pullMode,
List<TransactionsPostResponseBody> transactions) {
List<TransactionsPostResponseBody> transactions, OffsetDateTime dateRangeEnd) {
if (config.isCursorEnabled() && pullMode) {
String lastTxnIds = null;
if (config.isTransactionIdsFilterEnabled()) {
Expand All @@ -231,15 +231,22 @@ private void handleSuccess(String arrangementId, boolean pullMode,
.collect(Collectors.joining(DELIMITER));
}
patchCursor(arrangementId, buildPatchCursorRequest(
TransactionCursor.StatusEnum.SUCCESS,
OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat)),
lastTxnIds));
TransactionCursor.StatusEnum.SUCCESS, getLastTxnDate(dateRangeEnd),
lastTxnIds));
}

transactionPostIngestionService.handleSuccess(transactions);

log.debug("Ingested transactions: {}", transactions);
}

private String getLastTxnDate(OffsetDateTime dateRangeEnd) {
String lastTxnDate = OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat));
if (dateRangeEnd != null) {
lastTxnDate = dateRangeEnd.format(DateTimeFormatter.ofPattern(dateFormat));
}
return lastTxnDate;
}

private Mono<List<TransactionsPostResponseBody>> handleError(String arrangementId,
boolean pullMode, Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ void ingestionInPullModePatchCursor_Success() {

mockConfigForTransaction();
mockTransactionService();
when(transactionCursorApi.patchByArrangementId(anyString(), any())).thenReturn(Mono.empty());
TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse();
mockCursorApiForTransactions(transactionCursorResponse, false);

TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest();
transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now());
Expand Down Expand Up @@ -248,4 +249,28 @@ void ingestionInPushMode_Success() {
StepVerifier.create(productIngestResponse)
.assertNext(Assertions::assertNotNull).verifyComplete();
}

@Test
void ingestionInPullModePatchCursor_Success_withDates() {

mockConfigForTransaction();
mockTransactionService();
TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse();
mockCursorApiForTransactions(transactionCursorResponse, false);

TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest();
transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now().minusDays(10));
transactionIngestPullRequest.setDateRangeEnd(OffsetDateTime.now().minusDays(5));

when(transactionIntegrationService.pullTransactions(transactionIngestPullRequest))
.thenReturn(Flux.just(new TransactionsPostRequestBody().withType("type1").
withArrangementId("1234").withReference("ref")
.withExternalArrangementId("externalArrId")));

Mono<TransactionIngestResponse> productIngestResponse = transactionIngestionService
.ingestPull(transactionIngestPullRequest);
StepVerifier.create(productIngestResponse)
.assertNext(Assertions::assertNotNull)
.verifyComplete();
}
}

0 comments on commit b8fd2ad

Please sign in to comment.