Skip to content

Commit

Permalink
Merge pull request #383 from Backbase/feature/cursor-dates
Browse files Browse the repository at this point in the history
Always check cursor and create if it does not exist
  • Loading branch information
shafiquech authored Oct 31, 2023
2 parents f895a3b + b8fd2ad commit 36d13e5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 8 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.

## [3.66.0](https://github.com/Backbase/stream-services/compare/3.65.0...3.66.0)
### Changed
- Always check cursor and create if it does not exist.
- If `DateRangeEnd` is passed in the composition request set that as lastTxnDate instead of system date.

## [3.65.0](https://github.com/Backbase/stream-services/compare/3.64.0...3.65.0)
### Changed
- Move call to processAudiencesSegmentation after setupUsers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Mono<TransactionIngestResponse> ingestPull(TransactionIngestPullRequest i
.map(f -> filterExisting(f, ingestPullRequest.getLastIngestedExternalIds()))
.flatMap(this::sendToDbs)
.doOnSuccess(list -> handleSuccess(
ingestPullRequest.getArrangementId(), true, list))
ingestPullRequest.getArrangementId(), true, list, ingestPullRequest.getDateRangeEnd()))
.onErrorResume(e -> handleError(
ingestPullRequest.getArrangementId(), true, e))
.map(list -> buildResponse(list, ingestPullRequest));
Expand Down Expand Up @@ -88,7 +88,7 @@ private Mono<TransactionIngestPullRequest> buildIntegrationRequest(TransactionIn
transactionIngestPullRequest.setDateRangeEnd(dateRangeEndFromRequest == null
? currentTime : dateRangeEndFromRequest);

if (dateRangeStartFromRequest == null && config.isCursorEnabled()) {
if (config.isCursorEnabled()) {
log.info("Transaction Cursor is enabled and Request has no Start Date");
return getCursor(transactionIngestPullRequest)
.switchIfEmpty(createCursor(transactionIngestPullRequest));
Expand All @@ -107,7 +107,7 @@ public Mono<TransactionIngestResponse> ingestPush(TransactionIngestPushRequest i
return Mono.just(Flux.fromIterable(ingestPushRequest.getTransactions()))
.flatMap(this::sendToDbs)
.doOnSuccess(list -> handleSuccess(
ingestPushRequest.getArrangementId(), false, list))
ingestPushRequest.getArrangementId(), false, list, null))
.onErrorResume(e -> handleError(
ingestPushRequest.getArrangementId(), false, e))
.map(list -> buildResponse(list, ingestPushRequest));
Expand Down Expand Up @@ -222,7 +222,7 @@ private TransactionIngestResponse buildResponse(List<TransactionsPostResponseBod
}

private void handleSuccess(String arrangementId, boolean pullMode,
List<TransactionsPostResponseBody> transactions) {
List<TransactionsPostResponseBody> transactions, OffsetDateTime dateRangeEnd) {
if (config.isCursorEnabled() && pullMode) {
String lastTxnIds = null;
if (config.isTransactionIdsFilterEnabled()) {
Expand All @@ -231,15 +231,22 @@ private void handleSuccess(String arrangementId, boolean pullMode,
.collect(Collectors.joining(DELIMITER));
}
patchCursor(arrangementId, buildPatchCursorRequest(
TransactionCursor.StatusEnum.SUCCESS,
OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat)),
lastTxnIds));
TransactionCursor.StatusEnum.SUCCESS, getLastTxnDate(dateRangeEnd),
lastTxnIds));
}

transactionPostIngestionService.handleSuccess(transactions);

log.debug("Ingested transactions: {}", transactions);
}

private String getLastTxnDate(OffsetDateTime dateRangeEnd) {
String lastTxnDate = OffsetDateTime.now().format(DateTimeFormatter.ofPattern(dateFormat));
if (dateRangeEnd != null) {
lastTxnDate = dateRangeEnd.format(DateTimeFormatter.ofPattern(dateFormat));
}
return lastTxnDate;
}

private Mono<List<TransactionsPostResponseBody>> handleError(String arrangementId,
boolean pullMode, Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ void ingestionInPullModePatchCursor_Success() {

mockConfigForTransaction();
mockTransactionService();
when(transactionCursorApi.patchByArrangementId(anyString(), any())).thenReturn(Mono.empty());
TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse();
mockCursorApiForTransactions(transactionCursorResponse, false);

TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest();
transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now());
Expand Down Expand Up @@ -248,4 +249,28 @@ void ingestionInPushMode_Success() {
StepVerifier.create(productIngestResponse)
.assertNext(Assertions::assertNotNull).verifyComplete();
}

@Test
void ingestionInPullModePatchCursor_Success_withDates() {

mockConfigForTransaction();
mockTransactionService();
TransactionCursorResponse transactionCursorResponse = mockTransactionCursorResponse();
mockCursorApiForTransactions(transactionCursorResponse, false);

TransactionIngestPullRequest transactionIngestPullRequest = mockTransactionIngestPullRequest();
transactionIngestPullRequest.setDateRangeStart(OffsetDateTime.now().minusDays(10));
transactionIngestPullRequest.setDateRangeEnd(OffsetDateTime.now().minusDays(5));

when(transactionIntegrationService.pullTransactions(transactionIngestPullRequest))
.thenReturn(Flux.just(new TransactionsPostRequestBody().withType("type1").
withArrangementId("1234").withReference("ref")
.withExternalArrangementId("externalArrId")));

Mono<TransactionIngestResponse> productIngestResponse = transactionIngestionService
.ingestPull(transactionIngestPullRequest);
StepVerifier.create(productIngestResponse)
.assertNext(Assertions::assertNotNull)
.verifyComplete();
}
}

0 comments on commit 36d13e5

Please sign in to comment.