Skip to content

Commit

Permalink
Merge pull request #2562 from alphagov/fix_emitted_event_backfill
Browse files Browse the repository at this point in the history
PP-7224 Fix sweeping emitted refunds events for historic charges
  • Loading branch information
kbottla authored Sep 28, 2020
2 parents 7834390 + dba2c74 commit 036c211
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public void setId(Long id) {
this.id = id;
}

public void setDoNotRetryEmitUntil(ZonedDateTime doNotRetryEmitUntil) {
this.doNotRetryEmitUntil = doNotRetryEmitUntil;
}

@Override
public String toString() {
return "EmittedEventEntity{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
import org.slf4j.MDC;
import uk.gov.pay.connector.app.ConnectorConfiguration;
import uk.gov.pay.connector.app.config.EmittedEventSweepConfig;
import uk.gov.pay.connector.charge.dao.ChargeDao;
import uk.gov.pay.connector.charge.model.domain.ChargeEntity;
import uk.gov.pay.connector.charge.service.ChargeService;
import uk.gov.pay.connector.events.dao.EmittedEventDao;
import uk.gov.pay.connector.events.model.ResourceType;
import uk.gov.pay.connector.queue.statetransition.StateTransitionService;
import uk.gov.pay.connector.refund.dao.RefundDao;
import uk.gov.pay.connector.refund.model.domain.RefundEntity;
import uk.gov.pay.connector.tasks.HistoricalEventEmitter;
Expand All @@ -20,7 +18,9 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;

import static java.time.ZoneOffset.UTC;
import static java.time.ZonedDateTime.now;
import static uk.gov.pay.logging.LoggingKeys.PAYMENT_EXTERNAL_ID;

public class EmittedEventsBackfillService {
private final Logger logger = LoggerFactory.getLogger(getClass());
Expand All @@ -31,19 +31,18 @@ public class EmittedEventsBackfillService {
private final HistoricalEventEmitter historicalEventEmitter;
private RefundDao refundDao;
private final EmittedEventSweepConfig sweepConfig;
private static final boolean shouldForceEmission = true;
private long doNotRetryEmittingEventUntilDurationInSeconds;

@Inject
public EmittedEventsBackfillService(EmittedEventDao emittedEventDao, ChargeService chargeService, RefundDao refundDao,
ChargeDao chargeDao, EventService eventService,
StateTransitionService stateTransitionService,
ConnectorConfiguration configuration) {
HistoricalEventEmitter historicalEventEmitter, ConnectorConfiguration configuration) {
this.emittedEventDao = emittedEventDao;
this.chargeService = chargeService;
this.refundDao = refundDao;
this.sweepConfig = configuration.getEmittedEventSweepConfig();
this.historicalEventEmitter = new HistoricalEventEmitter(emittedEventDao, refundDao, chargeService, shouldForceEmission,
eventService, stateTransitionService);
this.doNotRetryEmittingEventUntilDurationInSeconds = configuration.getEventEmitterConfig()
.getDefaultDoNotRetryEmittingEventUntilDurationInSeconds();
this.historicalEventEmitter = historicalEventEmitter;
}

public void backfillNotEmittedEvents() {
Expand All @@ -52,25 +51,31 @@ public void backfillNotEmittedEvents() {
emittedEventBatchIterator.forEachRemaining(batch -> {
logger.info(
"Processing not emitted events [lastProcessedId={}, no.of.events={}, oldestDate={}]",
batch.getStartId(),
batch.getEndId().map(Object::toString).orElse("none"),
batch.getStartId(),
batch.getEndId().map(Object::toString).orElse("none"),
batch.oldestEventDate().map(ZonedDateTime::toString).orElse("none")
);

batch.getEvents().forEach(this::backfillEvent);
});

logger.info("Finished processing not emitted events [lastProcessedId={}, maxId={}]",
emittedEventBatchIterator.getCurrentBatchStartId(), emittedEventBatchIterator.getMaximumIdOfEventsEligibleForReEmission().map(Object::toString).orElse("none"));
emittedEventBatchIterator.getCurrentBatchStartId(), emittedEventBatchIterator
.getMaximumIdOfEventsEligibleForReEmission().map(Object::toString).orElse("none"));
}

@Transactional
public void backfillEvent(EmittedEventEntity event) {
try {
String chargeId = chargeIdForEvent(event);
ChargeEntity charge = chargeService.findChargeByExternalId(chargeId);
MDC.put("chargeId", charge.getExternalId());
historicalEventEmitter.processPaymentAndRefundEvents(charge);

MDC.put(PAYMENT_EXTERNAL_ID, chargeId);
if (isPaymentEvent(event)) {
ChargeEntity chargeEntity = chargeService.findChargeByExternalId(chargeId);
historicalEventEmitter.processPaymentEvents(chargeEntity, true);
} else {
historicalEventEmitter.emitEventsForRefund(event.getResourceExternalId(), true);
}
event.setEmittedDate(now(ZoneId.of("UTC")));
} catch (Exception e) {
logger.error(
Expand All @@ -81,9 +86,9 @@ public void backfillEvent(EmittedEventEntity event) {
event.getEventType(),
event.getEventDate()
);
throw e;
event.setDoNotRetryEmitUntil(ZonedDateTime.now(UTC).plusSeconds(doNotRetryEmittingEventUntilDurationInSeconds));
} finally {
MDC.remove("chargeId");
MDC.remove(PAYMENT_EXTERNAL_ID);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.LoggerFactory;
import uk.gov.pay.connector.app.ConnectorConfiguration;
import uk.gov.pay.connector.app.config.EmittedEventSweepConfig;
import uk.gov.pay.connector.app.config.EventEmitterConfig;
import uk.gov.pay.connector.charge.dao.ChargeDao;
import uk.gov.pay.connector.charge.exception.ChargeNotFoundRuntimeException;
import uk.gov.pay.connector.charge.model.domain.Charge;
Expand All @@ -29,13 +30,14 @@
import uk.gov.pay.connector.queue.statetransition.StateTransitionService;
import uk.gov.pay.connector.refund.dao.RefundDao;
import uk.gov.pay.connector.refund.model.domain.RefundEntity;
import uk.gov.pay.connector.tasks.HistoricalEventEmitter;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -80,13 +82,20 @@ public class EmittedEventsBackfillServiceTest {
public void setUp() {
var sweepConfig = mock(EmittedEventSweepConfig.class);
when(sweepConfig.getNotEmittedEventMaxAgeInSeconds()).thenReturn(1800);

EventEmitterConfig mockEventEmitterConfig = mock(EventEmitterConfig.class);
when(mockEventEmitterConfig.getDefaultDoNotRetryEmittingEventUntilDurationInSeconds()).thenReturn(60L);

when(connectorConfiguration.getEmittedEventSweepConfig()).thenReturn(sweepConfig);
when(connectorConfiguration.getEventEmitterConfig()).thenReturn(mockEventEmitterConfig);
Logger root = (Logger) LoggerFactory.getLogger(EmittedEventsBackfillService.class);
mockAppender = mock(Appender.class);
root.setLevel(Level.INFO);
root.addAppender(mockAppender);
emittedEventsBackfillService = new EmittedEventsBackfillService(emittedEventDao, chargeService, refundDao, chargeDao,
eventService, stateTransitionService, connectorConfiguration);
HistoricalEventEmitter historicalEventEmitter = new HistoricalEventEmitter(emittedEventDao, refundDao,
chargeService, true, eventService, stateTransitionService);
emittedEventsBackfillService = new EmittedEventsBackfillService(emittedEventDao, chargeService, refundDao,
historicalEventEmitter, connectorConfiguration);
lenient().when(chargeService.findChargeByExternalId(any())).thenThrow(new ChargeNotFoundRuntimeException(""));
chargeEntity = ChargeEntityFixture
.aValidChargeEntity()
Expand All @@ -102,13 +111,13 @@ public void setUp() {
when(refundEntity.getChargeExternalId()).thenReturn(chargeEntity.getExternalId());
when(emittedEventDao.findNotEmittedEventMaxIdOlderThan(any(ZonedDateTime.class), any())).thenReturn(Optional.of(maxId));
}

@Test
public void logsMessageWhenNoEmittedEventsSatisfyingCriteria() {
when(emittedEventDao.findNotEmittedEventMaxIdOlderThan(any(ZonedDateTime.class), any())).thenReturn(Optional.empty());

emittedEventsBackfillService.backfillNotEmittedEvents();

verify(emittedEventDao, never()).findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), anyLong(), eq(maxId), any());
verify(mockAppender, times(1)).doAppend(loggingEventArgumentCaptor.capture());
List<LoggingEvent> loggingEvents = loggingEventArgumentCaptor.getAllValues();
Expand Down Expand Up @@ -142,10 +151,10 @@ public void backfillsEventsWhenEmittedRefundEventSatisfyingCriteria() {
.build();
when(emittedEventDao.findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any())).thenReturn(List.of(emittedEvent));
when(refundDao.findByExternalId(refundEntity.getExternalId())).thenReturn(Optional.of(refundEntity));
when(refundDao.searchAllHistoryByChargeExternalId(chargeEntity.getExternalId())).thenReturn(List.of(refundHistory));
doReturn(chargeEntity).when(chargeService).findChargeByExternalId(chargeEntity.getExternalId());
when(refundDao.getRefundHistoryByRefundExternalId(refundEntity.getExternalId())).thenReturn(List.of(refundHistory));
doReturn(Optional.of(Charge.from(chargeEntity))).when(chargeService).findCharge(chargeEntity.getExternalId());
chargeEntity.getEvents().clear();

emittedEventsBackfillService.backfillNotEmittedEvents();

verify(emittedEventDao, times(1)).findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any());
Expand All @@ -171,17 +180,14 @@ public void backfillsEventsWhenEmittedEventsSatisfyingCriteria() {
when(emittedEventDao.findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any())).thenReturn(List.of(emittedPaymentEvent, emittedRefundEvent));
doReturn(chargeEntity).when(chargeService).findChargeByExternalId(chargeEntity.getExternalId());
when(refundDao.findByExternalId(refundEntity.getExternalId())).thenReturn(Optional.of(refundEntity));
when(refundDao.searchAllHistoryByChargeExternalId(chargeEntity.getExternalId())).thenReturn(List.of(refundHistory));
when(refundDao.getRefundHistoryByRefundExternalId(refundEntity.getExternalId())).thenReturn(List.of(refundHistory));
doReturn(Optional.of(Charge.from(chargeEntity))).when(chargeService).findCharge(chargeEntity.getExternalId());

emittedEventsBackfillService.backfillNotEmittedEvents();

verify(emittedEventDao, times(1)).findNotEmittedEventsOlderThan(any(ZonedDateTime.class), anyInt(), eq(0L), eq(maxId), any());
// Each event triggers a full backfill for the charge entity and associated refunds.
// Not clear if this is useful behaviour but that's as-implemented. Since there is one charge and one refund
// there are two events associated. Because there are two events, these two events get emitted two times, hence
// four event emissions in total
verify(stateTransitionService, times(4)).offerStateTransition(any(), any(), isNull());
// 2 events emitted for payment event and refund event
verify(stateTransitionService, times(2)).offerStateTransition(any(), any(), isNull());
verify(mockAppender, times(2)).doAppend(loggingEventArgumentCaptor.capture());
List<LoggingEvent> loggingEvents = loggingEventArgumentCaptor.getAllValues();
assertThat(loggingEvents.get(0).getFormattedMessage(), is("Processing not emitted events [lastProcessedId=0, no.of.events=2, oldestDate=2019-09-20T09:00Z]"));
Expand Down

0 comments on commit 036c211

Please sign in to comment.