Skip to content

Commit

Permalink
[ AJ-1363] Make Cloning/startup multi-replica safe (#356)
Browse files Browse the repository at this point in the history
Make cloning replica safe by refactoring our initialization logic to use distributed locks. Ultimately, in code initCloneMode() returning false will have us create the default schema and returning true has us "do nothing" (ie just start WDS).
  • Loading branch information
ashanhol authored Oct 16, 2023
1 parent 2a17fcc commit 87cabf3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,62 +79,49 @@ public InstanceInitializerBean(
*/
public void initializeInstance() {
LOGGER.info("Default workspace id loaded as {}.", workspaceId);
boolean isInCloneMode = isInCloneMode(sourceWorkspaceId);
LOGGER.info("isInCloneMode={}.", isInCloneMode);
if (isInCloneMode) {
LOGGER.info("Source workspace id loaded as {}.", sourceWorkspaceId);
boolean cloneSuccess = initCloneMode();
if (cloneSuccess) {
// Enter clone mode if sourceWorkspaceId is specified
if (isInCloneMode(sourceWorkspaceId)) {
LOGGER.info(
"Cloning mode enabled, attempting to clone from {} into {}.",
sourceWorkspaceId,
workspaceId);
// Initialize default schema if initCloneMode() returns false.
if (initCloneMode(sourceWorkspaceId)) {
LOGGER.info("Cloning complete.");
} else {
initializeDefaultInstance();
return;
}
} else {
initializeDefaultInstance();
LOGGER.info("Failed clone state, falling back to initialize default schema.");
}

initializeDefaultInstance();
}

/**
* Determine if this WDS is starting as a clone of some other WDS. If a valid {@code
* SOURCE_WORKSPACE_ID} env var is provided to this WDS, it will start in clone mode.
* Determine if the arguments needed for cloning (in this case, a valid {@code
* SOURCE_WORKSPACE_ID} env var) allow us to proceed with cloning.
*
* @param sourceWorkspaceId value of {@code SOURCE_WORKSPACE_ID}; provided as an argument to
* assist with unit tests.
* @return whether this WDS is a clone.
* @return whether {@code SOURCE_WORKSPACE_ID} is valid.
*/
protected boolean isInCloneMode(String sourceWorkspaceId) {
if (StringUtils.isNotBlank(sourceWorkspaceId)) {
LOGGER.info("SourceWorkspaceId found, checking database");
LOGGER.info("SourceWorkspaceId found, checking validity");
try {
UUID.fromString(sourceWorkspaceId);
} catch (IllegalArgumentException e) {
LOGGER.warn(
"SourceWorkspaceId could not be parsed, unable to clone DB. Provided SourceWorkspaceId: {}.",
sourceWorkspaceId);
"SourceWorkspaceId {} could not be parsed, unable to clone DB.", sourceWorkspaceId);
return false;
}

if (sourceWorkspaceId.equals(workspaceId)) {
LOGGER.warn("SourceWorkspaceId and current WorkspaceId can't be the same.");
return false;
}

try {
// does the default pg schema already exist for this workspace? This should not happen,
// but we want to protect against overwriting it.
boolean instanceAlreadyExists =
instanceDao.instanceSchemaExists(UUID.fromString(workspaceId));
LOGGER.info("isInCloneMode(): instanceAlreadyExists={}", instanceAlreadyExists);
// TODO handle the case where a clone already ran, but failed; should we retry?
return !instanceAlreadyExists;
} catch (IllegalArgumentException e) {
LOGGER.warn(
"WorkspaceId could not be parsed, unable to clone DB. Provided default WorkspaceId: {}.",
workspaceId);
return false;
}
return true;
}
LOGGER.info("No SourceWorkspaceId found, initializing default schema.");
LOGGER.info("No SourceWorkspaceId found, unable to proceed with cloning.");
return false;
}

Expand All @@ -143,29 +130,29 @@ Cloning comes from the concept of copying an original (source) workspace data (f
a newly created (destination) workspace. WDS at start up will always have a current WorkspaceId, which in the
context of cloning will effectively be the destination. The SourceWorkspaceId will only be populated if the currently
starting WDS was initiated via a clone operation and will contain the WorkspaceId of the original workspace where the cloning
was triggered.
was triggered. This function returns false for an incomplete clone.
*/
private boolean initCloneMode() {
protected boolean initCloneMode(String sourceWorkspaceId) {
LOGGER.info("Starting in clone mode...");
UUID trackingId = UUID.randomUUID();
Lock lock = lockRegistry.obtain(sourceWorkspaceId);
try {
// Make sure it's safe to start cloning (in case another replica is trying to clone)
boolean lockAquired = lock.tryLock(1, TimeUnit.SECONDS);
if (!lockAquired) {
LOGGER.info("Failed to acquire lock in initCloneMode");
return false;
boolean lockAcquired = lock.tryLock(1, TimeUnit.SECONDS);
if (!lockAcquired) {
LOGGER.info("Failed to acquire lock in initCloneMode. Exiting clone mode.");
return true;
}

// is a clone operation already running? This can happen when WDS is running with
// multiple replicas and another replica started first and has initiated the clone.
// It can also happen in a corner case where this replica restarted during the clone
// operation.
UUID sourceWorkspaceUuid = UUID.fromString(sourceWorkspaceId);
boolean cloneAlreadyRunning = cloneDao.cloneExistsForWorkspace(sourceWorkspaceUuid);
if (cloneAlreadyRunning) {
LOGGER.info("Clone already running, terminating initCloneMode");
return false;
// Acquiring the lock means other replicas have not started or have finished cloning.
// We can run into an existing clone operation if WDS kicks it off and has to restart,
// or in in a multi replica scenario where one is cloning and another is not.
// If there's a clone entry and no default schema, another replica errored before completing.
// If there's a clone entry and a default schema there's nothing for us to do here.
if (cloneDao.cloneExistsForWorkspace((UUID.fromString(sourceWorkspaceId)))) {
boolean instanceExists = instanceDao.instanceSchemaExists(UUID.fromString(workspaceId));
LOGGER.info("Previous clone entry found. Instance schema exists: {}.", instanceExists);
return instanceExists;
}

// First, create an entry in the clone table to mark cloning has started
Expand Down Expand Up @@ -206,7 +193,7 @@ private boolean initCloneMode() {
LOGGER.error("An error occurred during clone mode. Error: {}", e.toString());
// handle the interrupt if lock was interrupted
if (e instanceof InterruptedException) {
LOGGER.error("Error with acquiring cloning/schema initialization Lock: {}", e.getMessage());
LOGGER.error("Error with acquiring cloning Lock: {}", e.getMessage());
Thread.currentThread().interrupt();
}
try {
Expand Down Expand Up @@ -322,8 +309,7 @@ private void initializeDefaultInstance() {

} catch (IllegalArgumentException e) {
LOGGER.warn(
"Workspace id could not be parsed, a default schema won't be created. Provided id: {}.",
workspaceId);
"Workspace id {} could not be parsed, a default schema won't be created.", workspaceId);
} catch (DataAccessException e) {
LOGGER.error("Failed to create default schema id for workspaceId {}.", workspaceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -65,20 +66,22 @@ class InstanceInitializerBeanTest {
@Autowired InstanceInitializerBean instanceInitializerBean;
@MockBean JdbcLockRegistry registry;
@SpyBean InstanceDao instanceDao;
@SpyBean CloneDao cloneDao;

@Value("${twds.instance.workspace-id}")
String workspaceId;

// sourceWorkspaceId when we need one
final String sourceWorkspaceId = UUID.randomUUID().toString();

// randomly generated UUID
final UUID instanceID = UUID.fromString("90e1b179-9f83-4a6f-a8c2-db083df4cd03");

Lock mockLock = mock(Lock.class);

// JdbcLockRegistry registry = mock(JdbcLockRegistry.class);

@BeforeEach
void setUp() {
when(mockLock.tryLock()).thenReturn(true);
void setUp() throws InterruptedException {
when(mockLock.tryLock(anyLong(), any())).thenReturn(true);
when(registry.obtain(anyString())).thenReturn(mockLock);
}

Expand Down Expand Up @@ -112,6 +115,61 @@ void testSchemaAlreadyExists() {
assertTrue(instanceDao.instanceSchemaExists(instanceID));
}

@Test
// Cloning where we can get a lock and complete successfully.
void cloneSuccessfully() {
// instance does not exist
assertFalse(instanceDao.instanceSchemaExists(instanceID));
// enter clone mode
instanceInitializerBean.initCloneMode(sourceWorkspaceId);
// confirm we have moved forward with cloning
assertTrue(cloneDao.cloneExistsForWorkspace(UUID.fromString(sourceWorkspaceId)));
}

@Test
// Cloning where we can't get a lock
void cloneWithLockFail() throws InterruptedException {
when(mockLock.tryLock(anyLong(), any())).thenReturn(false);
// instance does not exist
assertFalse(instanceDao.instanceSchemaExists(instanceID));
// enter clone mode
boolean cleanExit = instanceInitializerBean.initCloneMode(sourceWorkspaceId);
// initCloneMode() should have returned true since we did not enter a situation
// where we'd have to create the default schema.
assertTrue(cleanExit);
// confirm we did not enter clone mode
assertFalse(cloneDao.cloneExistsForWorkspace(UUID.fromString(sourceWorkspaceId)));
}

@Test
// Cloning where we can get lock, but entry already exists in clone table and default schema
// exists.
void cloneWithCloneTableAndInstanceExist() {
// start with instance and clone entry
instanceDao.createSchema(instanceID);
cloneDao.createCloneEntry(UUID.randomUUID(), UUID.fromString(sourceWorkspaceId));
// enter clone mode
boolean cleanExit = instanceInitializerBean.initCloneMode(sourceWorkspaceId);
// initCloneMode() should have returned true since we did not enter a situation
// where we'd have to create the default schema.
assertTrue(cleanExit);
}

@Test
// Cloning where we can get lock, but entry already exists in clone table and default schema does
// not exist.
void cloneWithCloneTableAndNoInstance() {
// start with clone entry
cloneDao.createCloneEntry(UUID.randomUUID(), UUID.fromString(sourceWorkspaceId));
// instance does not exist
assertFalse(instanceDao.instanceSchemaExists(instanceID));
// enter clone mode
boolean cleanExit = instanceInitializerBean.initCloneMode(sourceWorkspaceId);
// initCloneMode() should have returned false since we encountered a situation
// where we'd have to create the default schema.
assertFalse(cleanExit);
}

@Test
void sourceWorkspaceIDNotProvided() {
boolean cloneMode = instanceInitializerBean.isInCloneMode(null);
Expand All @@ -127,13 +185,6 @@ void blankSourceWorkspaceID() {
assertFalse(cloneMode);
}

@Test
void sourceWorkspaceSchemaExists() {
instanceDao.createSchema(instanceID);
boolean cloneMode = instanceInitializerBean.isInCloneMode(UUID.randomUUID().toString());
assertFalse(cloneMode);
}

@Test
void sourceWorkspaceIDCorrect() {
boolean cloneMode = instanceInitializerBean.isInCloneMode(UUID.randomUUID().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -63,8 +64,8 @@ class InstanceInitializerNoWorkspaceIdTest {
Lock mockLock = mock(Lock.class);

@BeforeEach
void setUp() {
when(mockLock.tryLock()).thenReturn(true);
void setUp() throws InterruptedException {
when(mockLock.tryLock(anyLong(), any())).thenReturn(true);
when(registry.obtain(anyString())).thenReturn(mockLock);
}

Expand Down

0 comments on commit 87cabf3

Please sign in to comment.