Skip to content

Commit

Permalink
Fix sweeping emitted refunds events for hstoric charges
Browse files Browse the repository at this point in the history
- Emitted events sweeper for refunds currently relies on related charge being in connector database.
  But we should be able to emit refund events for expunged charges. This fixes backfilling refund events for historic charges
- Also took opportunity to split full backfill (charges + refunds) for a given event. For a charge event, only payment events are emitted and for refund event, only refund events are emitted. This can be improved further to  emit specific event only.
- In case of any exception when processing event, event is marked to not retry instead of throwing exception. The emitted events sweeper continues emitting pending events.
  • Loading branch information
kbottla committed Sep 28, 2020
1 parent 7834390 commit dba2c74
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public void setId(Long id) {
this.id = id;
}

public void setDoNotRetryEmitUntil(ZonedDateTime doNotRetryEmitUntil) {
this.doNotRetryEmitUntil = doNotRetryEmitUntil;
}

@Override
public String toString() {
return "EmittedEventEntity{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
import org.slf4j.MDC;
import uk.gov.pay.connector.app.ConnectorConfiguration;
import uk.gov.pay.connector.app.config.EmittedEventSweepConfig;
import uk.gov.pay.connector.charge.dao.ChargeDao;
import uk.gov.pay.connector.charge.model.domain.ChargeEntity;
import uk.gov.pay.connector.charge.service.ChargeService;
import uk.gov.pay.connector.events.dao.EmittedEventDao;
import uk.gov.pay.connector.events.model.ResourceType;
import uk.gov.pay.connector.queue.statetransition.StateTransitionService;
import uk.gov.pay.connector.refund.dao.RefundDao;
import uk.gov.pay.connector.refund.model.domain.RefundEntity;
import uk.gov.pay.connector.tasks.HistoricalEventEmitter;
Expand All @@ -20,7 +18,9 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;

import static java.time.ZoneOffset.UTC;
import static java.time.ZonedDateTime.now;
import static uk.gov.pay.logging.LoggingKeys.PAYMENT_EXTERNAL_ID;

public class EmittedEventsBackfillService {
private final Logger logger = LoggerFactory.getLogger(getClass());
Expand All @@ -31,19 +31,18 @@ public class EmittedEventsBackfillService {
private final HistoricalEventEmitter historicalEventEmitter;
private RefundDao refundDao;
private final EmittedEventSweepConfig sweepConfig;
private static final boolean shouldForceEmission = true;
private long doNotRetryEmittingEventUntilDurationInSeconds;

@Inject
public EmittedEventsBackfillService(EmittedEventDao emittedEventDao, ChargeService chargeService, RefundDao refundDao,
ChargeDao chargeDao, EventService eventService,
StateTransitionService stateTransitionService,
ConnectorConfiguration configuration) {
HistoricalEventEmitter historicalEventEmitter, ConnectorConfiguration configuration) {
this.emittedEventDao = emittedEventDao;
this.chargeService = chargeService;
this.refundDao = refundDao;
this.sweepConfig = configuration.getEmittedEventSweepConfig();
this.historicalEventEmitter = new HistoricalEventEmitter(emittedEventDao, refundDao, chargeService, shouldForceEmission,
eventService, stateTransitionService);
this.doNotRetryEmittingEventUntilDurationInSeconds = configuration.getEventEmitterConfig()
.getDefaultDoNotRetryEmittingEventUntilDurationInSeconds();
this.historicalEventEmitter = historicalEventEmitter;
}

public void backfillNotEmittedEvents() {
Expand All @@ -52,25 +51,31 @@ public void backfillNotEmittedEvents() {
emittedEventBatchIterator.forEachRemaining(batch -> {
logger.info(
"Processing not emitted events [lastProcessedId={}, no.of.events={}, oldestDate={}]",
batch.getStartId(),
batch.getEndId().map(Object::toString).orElse("none"),
batch.getStartId(),
batch.getEndId().map(Object::toString).orElse("none"),
batch.oldestEventDate().map(ZonedDateTime::toString).orElse("none")
);

batch.getEvents().forEach(this::backfillEvent);
});

logger.info("Finished processing not emitted events [lastProcessedId={}, maxId={}]",
emittedEventBatchIterator.getCurrentBatchStartId(), emittedEventBatchIterator.getMaximumIdOfEventsEligibleForReEmission().map(Object::toString).orElse("none"));
emittedEventBatchIterator.getCurrentBatchStartId(), emittedEventBatchIterator
.getMaximumIdOfEventsEligibleForReEmission().map(Object::toString).orElse("none"));
}

@Transactional
public void backfillEvent(EmittedEventEntity event) {
try {
String chargeId = chargeIdForEvent(event);
ChargeEntity charge = chargeService.findChargeByExternalId(chargeId);
MDC.put("chargeId", charge.getExternalId());
historicalEventEmitter.processPaymentAndRefundEvents(charge);

MDC.put(PAYMENT_EXTERNAL_ID, chargeId);
if (isPaymentEvent(event)) {
ChargeEntity chargeEntity = chargeService.findChargeByExternalId(chargeId);
historicalEventEmitter.processPaymentEvents(chargeEntity, true);
} else {
historicalEventEmitter.emitEventsForRefund(event.getResourceExternalId(), true);
}
event.setEmittedDate(now(ZoneId.of("UTC")));
} catch (Exception e) {
logger.error(
Expand All @@ -81,9 +86,9 @@ public void backfillEvent(EmittedEventEntity event) {
event.getEventType(),
event.getEventDate()
);
throw e;
event.setDoNotRetryEmitUntil(ZonedDateTime.now(UTC).plusSeconds(doNotRetryEmittingEventUntilDurationInSeconds));
} finally {
MDC.remove("chargeId");
MDC.remove(PAYMENT_EXTERNAL_ID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.LoggerFactory;
import uk.gov.pay.connector.app.ConnectorConfiguration;
import uk.gov.pay.connector.app.config.EmittedEventSweepConfig;
import uk.gov.pay.connector.app.config.EventEmitterConfig;
import uk.gov.pay.connector.charge.dao.ChargeDao;
import uk.gov.pay.connector.charge.exception.ChargeNotFoundRuntimeException;
import uk.gov.pay.connector.charge.model.domain.Charge;
Expand All @@ -29,13 +30,14 @@
import uk.gov.pay.connector.queue.statetransition.StateTransitionService;
import uk.gov.pay.connector.refund.dao.RefundDao;
import uk.gov.pay.connector.refund.model.domain.RefundEntity;
import uk.gov.pay.connector.tasks.HistoricalEventEmitter;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -80,13 +82,20 @@ public class EmittedEventsBackfillServiceTest {
public void setUp() {
var sweepConfig = mock(EmittedEventSweepConfig.class);
when(sweepConfig.getNotEmittedEventMaxAgeInSeconds()).thenReturn(1800);

EventEmitterConfig mockEventEmitterConfig = mock(EventEmitterConfig.class);
when(mockEventEmitterConfig.getDefaultDoNotRetryEmittingEventUntilDurationInSeconds()).thenReturn(60L);

when(connectorConfiguration.getEmittedEventSweepConfig()).thenReturn(sweepConfig);
when(connectorConfiguration.getEventEmitterConfig()).thenReturn(mockEventEmitterConfig);
Logger root = (Logger) LoggerFactory.getLogger(EmittedEventsBackfillService.class);
mockAppender = mock(Appender.class);
root.setLevel(Level.INFO);
root.addAppender(mockAppender);
emittedEventsBackfillService = new EmittedEventsBackfillService(emittedEventDao, chargeService, refundDao, chargeDao,
eventService, stateTransitionService, connectorConfiguration);
HistoricalEventEmitter historicalEventEmitter = new HistoricalEventEmitter(emittedEventDao, refundDao,
chargeService, true, eventService, stateTransitionService);
emittedEventsBackfillService = new EmittedEventsBackfillService(emittedEventDao, chargeService, refundDao,
historicalEventEmitter, connectorConfiguration);
lenient().when(chargeService.findChargeByExternalId(any())).thenThrow(new ChargeNotFoundRuntimeException(""));
chargeEntity = ChargeEntityFixture
.aValidChargeEntity()
Expand All @@ -102,13 +111,13 @@ public void setUp() {
when(refundEntity.getChargeExternalId()).thenReturn(chargeEntity.getExternalId());
when(emittedEventDao.findNotEmittedEventMaxIdOlderThan(any(ZonedDateTime.class), any())).thenReturn(Optional.of(maxId));
}

@Test
public void logsMessageWhenNoEmittedEventsSatisfyingCriteria() {
when(emittedEventDao.findNotEmittedEventMaxIdOlderThan(any(ZonedDateTime.class), any())).thenReturn(Optional.empty());

emittedEventsBackfillService.backfillNotEmittedEvents();

verify(emittedEventDao, never()).findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), anyLong(), eq(maxId), any());
verify(mockAppender, times(1)).doAppend(loggingEventArgumentCaptor.capture());
List<LoggingEvent> loggingEvents = loggingEventArgumentCaptor.getAllValues();
Expand Down Expand Up @@ -142,10 +151,10 @@ public void backfillsEventsWhenEmittedRefundEventSatisfyingCriteria() {
.build();
when(emittedEventDao.findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any())).thenReturn(List.of(emittedEvent));
when(refundDao.findByExternalId(refundEntity.getExternalId())).thenReturn(Optional.of(refundEntity));
when(refundDao.searchAllHistoryByChargeExternalId(chargeEntity.getExternalId())).thenReturn(List.of(refundHistory));
doReturn(chargeEntity).when(chargeService).findChargeByExternalId(chargeEntity.getExternalId());
when(refundDao.getRefundHistoryByRefundExternalId(refundEntity.getExternalId())).thenReturn(List.of(refundHistory));
doReturn(Optional.of(Charge.from(chargeEntity))).when(chargeService).findCharge(chargeEntity.getExternalId());
chargeEntity.getEvents().clear();

emittedEventsBackfillService.backfillNotEmittedEvents();

verify(emittedEventDao, times(1)).findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any());
Expand All @@ -171,17 +180,14 @@ public void backfillsEventsWhenEmittedEventsSatisfyingCriteria() {
when(emittedEventDao.findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any())).thenReturn(List.of(emittedPaymentEvent, emittedRefundEvent));
doReturn(chargeEntity).when(chargeService).findChargeByExternalId(chargeEntity.getExternalId());
when(refundDao.findByExternalId(refundEntity.getExternalId())).thenReturn(Optional.of(refundEntity));
when(refundDao.searchAllHistoryByChargeExternalId(chargeEntity.getExternalId())).thenReturn(List.of(refundHistory));
when(refundDao.getRefundHistoryByRefundExternalId(refundEntity.getExternalId())).thenReturn(List.of(refundHistory));
doReturn(Optional.of(Charge.from(chargeEntity))).when(chargeService).findCharge(chargeEntity.getExternalId());

emittedEventsBackfillService.backfillNotEmittedEvents();

verify(emittedEventDao, times(1)).findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any());
// Each event triggers a full backfill for the charge entity and associated refunds.
// Not clear if this is useful behaviour but that's as-implemented. Since there is one charge and one refund
// there are two events associated. Because there are two events, these two events get emitted two times, hence
// four event emissions in total
verify(stateTransitionService, times(4)).offerStateTransition(any(), any(), isNull());
// 2 events emitted for payment event and refund event
verify(stateTransitionService, times(2)).offerStateTransition(any(), any(), isNull());
verify(mockAppender, times(2)).doAppend(loggingEventArgumentCaptor.capture());
List<LoggingEvent> loggingEvents = loggingEventArgumentCaptor.getAllValues();
assertThat(loggingEvents.get(0).getFormattedMessage(), is("Processing not emitted events [lastProcessedId=0, no.of.events=2, oldestDate=2019-09-20T09:00Z]"));
Expand Down

0 comments on commit dba2c74

Please sign in to comment.