Skip to content

Commit

Permalink
MET-6239 fix consistency when processing one dataset at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
jeortizquan committed Nov 18, 2024
1 parent 3c538f1 commit 996591e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.support.locks.LockRegistry;

/**
* The type DeBias config.
Expand Down Expand Up @@ -46,13 +47,15 @@ public DeBiasStateService debiasMachine(DatasetDeBiasRepository datasetDeBiasRep
RecordLogRepository recordLogRepository,
RecordDeBiasPublishable recordDeBiasPublishable,
RecordDeBiasMainRepository recordDeBiasMainRepository,
RecordDeBiasDetailRepository recordDeBiasDetailRepository) {
RecordDeBiasDetailRepository recordDeBiasDetailRepository,
LockRegistry lockRegistry) {
return new DeBiasStateServiceImpl(datasetDeBiasRepository,
datasetRepository,
recordLogRepository,
recordDeBiasPublishable,
recordDeBiasMainRepository,
recordDeBiasDetailRepository);
recordDeBiasDetailRepository,
lockRegistry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.transaction.annotation.Transactional;

/**
Expand All @@ -41,6 +45,8 @@ public class DeBiasStateServiceImpl implements DeBiasStateService {
private final DatasetRepository datasetRepository;
private final RecordLogRepository recordLogRepository;
private final RecordDeBiasPublishable recordDeBiasPublishable;
private final Map<Integer, Lock> datasetIdLocksMap = new ConcurrentHashMap<>();
private final LockRegistry lockRegistry;

/**
* Instantiates a new DeBias detect service.
Expand All @@ -51,26 +57,33 @@ public class DeBiasStateServiceImpl implements DeBiasStateService {
* @param recordDeBiasPublishable the record publishable
* @param recordDeBiasMainRepository the record de bias main repository
* @param recordDeBiasDetailRepository the record de bias detail repository
* @param lockRegistry the lock registry
*/
public DeBiasStateServiceImpl(DatasetDeBiasRepository datasetDeBiasRepository,
DatasetRepository datasetRepository,
RecordLogRepository recordLogRepository,
RecordDeBiasPublishable recordDeBiasPublishable,
RecordDeBiasMainRepository recordDeBiasMainRepository,
RecordDeBiasDetailRepository recordDeBiasDetailRepository) {
RecordDeBiasDetailRepository recordDeBiasDetailRepository,
LockRegistry lockRegistry) {
this.datasetDeBiasRepository = datasetDeBiasRepository;
this.recordDeBiasMainRepository = recordDeBiasMainRepository;
this.recordDeBiasDetailRepository = recordDeBiasDetailRepository;
this.datasetRepository = datasetRepository;
this.recordLogRepository = recordLogRepository;
this.recordDeBiasPublishable = recordDeBiasPublishable;
this.lockRegistry = lockRegistry;
}

@Transactional
@Override
public boolean process(Integer datasetId) {
final Lock lock = datasetIdLocksMap.computeIfAbsent(datasetId, s -> lockRegistry.obtain("debiasProcess_" + datasetId));
LOGGER.info("{} {}", READY_STATE, datasetId);

try {
lock.lock();
LOGGER.info("DeBias processing: {} lock, Locked", datasetId);
DatasetDeBiasEntity dataset = getDatasetDeBiasEntity(datasetId);

processDatasetAndPublishToDeBiasReadyQueue(dataset);
Expand All @@ -79,6 +92,9 @@ public boolean process(Integer datasetId) {
} catch (RuntimeException e) {
LOGGER.warn("fail {} {}", READY_STATE, datasetId, e);
return false;
} finally {
lock.unlock();
LOGGER.info("DeBias processing: {} lock, Unlocked", datasetId);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.integration.support.locks.LockRegistry;

@ExtendWith(MockitoExtension.class)
class DeBiasStateServiceImplTest {
Expand All @@ -61,6 +63,9 @@ class DeBiasStateServiceImplTest {
@Mock
RecordDeBiasDetailRepository recordDeBiasDetailRepository;

@Mock
LockRegistry lockRegistry;

@InjectMocks
DeBiasStateServiceImpl debiasStateServiceImpl;

Expand All @@ -72,7 +77,7 @@ void processWhenDatasetNotExists_expectSuccess() {
datasetDeBiasEntity.setCreatedDate(ZonedDateTime.now());

when(datasetRepository.findById(anyInt())).thenThrow(NoSuchElementException.class);

when(lockRegistry.obtain(anyString())).thenReturn(new ReentrantLock());
boolean result = debiasStateServiceImpl.process(datasetId);

assertFalse(result);
Expand All @@ -82,6 +87,7 @@ void processWhenDatasetNotExists_expectSuccess() {
verify(datasetDeBiasRepository, times(0)).updateState(anyInt(), anyString());
verify(recordLogRepository, times(0)).findRecordLogByDatasetIdAndStep(anyString(),any());
verify(recordDeBiasPublishable, times(0)).publishToDeBiasQueue(any());
verify(lockRegistry, times(1)).obtain(anyString());
}


Expand All @@ -107,7 +113,7 @@ void processWhenNewHappyPath_Ready_Processing_Completed_expectSuccess() {
.thenReturn(null)
.thenReturn(datasetDeBiasEntity)
.thenReturn(datasetDeBiasEntity);

when(lockRegistry.obtain(anyString())).thenReturn(new ReentrantLock());
boolean result = debiasStateServiceImpl.process(datasetId);

assertTrue(result);
Expand All @@ -117,6 +123,7 @@ void processWhenNewHappyPath_Ready_Processing_Completed_expectSuccess() {
verify(datasetDeBiasRepository, times(0)).updateState(anyInt(), anyString());
verify(recordLogRepository, times(1)).findRecordLogByDatasetIdAndStep(anyString(),any());
verify(recordDeBiasPublishable, times(1)).publishToDeBiasQueue(any());
verify(lockRegistry, times(1)).obtain(anyString());
}

@Test
Expand All @@ -131,14 +138,15 @@ void processWhenNewHappyPath_Ready_andError_expectSuccess() {
when(datasetRepository.findById(anyInt())).thenReturn(Optional.of(datasetEntity));
when(datasetDeBiasRepository.findDetectionEntityByDatasetIdDatasetId(anyInt()))
.thenThrow(new RuntimeException("Error"));

when(lockRegistry.obtain(anyString())).thenReturn(new ReentrantLock());
boolean result = debiasStateServiceImpl.process(datasetId);

assertFalse(result);

verify(datasetDeBiasRepository, times(1)).findDetectionEntityByDatasetIdDatasetId(datasetId);
verify(datasetDeBiasRepository, times(0)).save(any(DatasetDeBiasEntity.class));
verify(datasetDeBiasRepository, times(0)).updateState(anyInt(), anyString());
verify(lockRegistry, times(1)).obtain(anyString());
}

@Test
Expand Down

0 comments on commit 996591e

Please sign in to comment.