Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AJ-1371: link snapshots during AnVIL pfb import #386

Merged
merged 21 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import org.apache.avro.generic.GenericRecord;
import org.databiosphere.workspacedataservice.dao.JobDao;
import org.databiosphere.workspacedataservice.jobexec.QuartzJob;
import org.databiosphere.workspacedataservice.retry.RestClientRetry;
import org.databiosphere.workspacedataservice.service.model.exception.PfbParsingException;
import org.databiosphere.workspacedataservice.workspacemanager.WorkspaceManagerDao;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/** Shell/starting point for PFB import via Quartz. */
Expand All @@ -36,9 +38,15 @@ public class PfbQuartzJob extends QuartzJob {
private final JobDao jobDao;
private final WorkspaceManagerDao wsmDao;

public PfbQuartzJob(JobDao jobDao, WorkspaceManagerDao wsmDao) {
@Value("${twds.instance.workspace-id}")
davidangb marked this conversation as resolved.
Show resolved Hide resolved
UUID workspaceId;

private final RestClientRetry restClientRetry;

public PfbQuartzJob(JobDao jobDao, WorkspaceManagerDao wsmDao, RestClientRetry restClientRetry) {
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
}

@Override
Expand All @@ -59,7 +67,7 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
false);

// process the stream into a list of unique snapshotIds
List<String> snapshotIds =
List<UUID> snapshotIds =
recordStream
.map(rec -> rec.get("object")) // Records in a pfb are stored under the key "object"
.filter(GenericRecord.class::isInstance) // which we expect to be a GenericRecord
Expand All @@ -70,18 +78,13 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
.map(obj -> obj.get(SNAPSHOT_ID_IDENTIFIER)) // within the GenericRecord, find the
// source_datarepo_snapshot_id
.filter(Objects::nonNull) // expect source_datarepo_snapshot_id to be non-null
.map(Object::toString)
.map(obj -> maybeUuid(obj.toString()))
.filter(Objects::nonNull)
.distinct() // find only the unique snapshotids
.toList();

// TODO AJ-1371 pass snapshotIds to WSM
for (String id : snapshotIds) {
try {
wsmDao.createDataRepoSnapshotReference(new SnapshotModel().id(UUID.fromString(id)));
} catch (Exception e) {
throw new PfbParsingException("Error processing PFB: Invalid snapshot UUID", e);
}
}
// link the found snapshots to the workspace, skipping any that were previously linked
linkSnapshots(snapshotIds);

} catch (IOException e) {
throw new PfbParsingException("Error processing PFB", e);
Expand All @@ -90,4 +93,42 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
// TODO: AJ-1227 implement PFB import.
logger.info("TODO: implement PFB import.");
}

protected void linkSnapshots(List<UUID> snapshotIds) {
// list existing snapshots linked to this workspace
PfbQuartzJobSupport pfbQuartzJobSupport =
new PfbQuartzJobSupport(workspaceId, wsmDao, restClientRetry);
List<UUID> existingSnapshotIds =
calypsomatic marked this conversation as resolved.
Show resolved Hide resolved
pfbQuartzJobSupport.existingPolicySnapshotIds(/* pageSize= */ 50);
Copy link
Contributor

@jladieu jladieu Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the arg comment ❤️

// find the snapshots in this PFB that are not already linked to this workspace
List<UUID> newSnapshotIds =
snapshotIds.stream().filter(id -> !existingSnapshotIds.contains(id)).toList();

logger.info(
"PFB contains {} snapshot ids. {} of these are already linked to the workspace; {} new links will be created.",
snapshotIds.size(),
snapshotIds.size() - newSnapshotIds.size(),
newSnapshotIds.size());

// pass snapshotIds to WSM
for (UUID uuid : newSnapshotIds) {
try {
RestClientRetry.VoidRestCall voidRestCall =
(() -> wsmDao.createDataRepoSnapshotReference(new SnapshotModel().id(uuid)));
restClientRetry.withRetryAndErrorHandling(
voidRestCall, "WSM.createDataRepoSnapshotReference");
} catch (Exception e) {
throw new PfbParsingException("Error processing PFB: Invalid snapshot UUID", e);
}
}
}

private UUID maybeUuid(String input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly a place for an Optional instead of a null? (disclosure: I generally steer clear of nulls in Java if possible)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna push back here - I think that's sound advice to use Optional instead of null for public methods, but this is a private method used only within the class. If it returned Optional, it would require additional steps inside of executeInternal(). Whereas now we can use one step .filter(Objects::nonNull), we'd need to change to .filter(Optional::isPresent).map(Optional::get).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine, particularly making the distinction between public and private APIs. I have a hunch that there'd be some optional-friendly predicate that could stand in for (and have equivalent terseness to) what's already there, but don't know off the top of my head what it'd be so carry on!

try {
return UUID.fromString(input);
} catch (Exception e) {
logger.warn("found unparseable snapshot id '{}' in PFB contents", input);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package org.databiosphere.workspacedataservice.dataimport;

import bio.terra.workspace.model.DataRepoSnapshotAttributes;
import bio.terra.workspace.model.ResourceAttributesUnion;
import bio.terra.workspace.model.ResourceDescription;
import bio.terra.workspace.model.ResourceList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.databiosphere.workspacedataservice.retry.RestClientRetry;
import org.databiosphere.workspacedataservice.service.model.exception.PfbImportException;
import org.databiosphere.workspacedataservice.workspacemanager.WorkspaceManagerDao;

public class PfbQuartzJobSupport {
davidangb marked this conversation as resolved.
Show resolved Hide resolved

private final UUID workspaceId;
private final WorkspaceManagerDao wsmDao;
private final RestClientRetry restClientRetry;

public PfbQuartzJobSupport(
UUID workspaceId, WorkspaceManagerDao wsmDao, RestClientRetry restClientRetry) {
this.workspaceId = workspaceId;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
}

/**
* Query WSM for the full list of referenced snapshots in this workspace, then return the list of
* unique snapshotIds from those references.
*
* @param pageSize how many references to return in each paginated request to WSM
* @return the list of unique ids for all pre-existing snapshot references
*/
protected List<UUID> existingPolicySnapshotIds(int pageSize) {
return extractSnapshotIds(listAllSnapshots(pageSize));
}

/**
* Given a ResourceList, find all the valid ids of referenced snapshots in that list
*
* @param resourceList the list in which to look for snapshotIds
* @return the list of unique ids in the provided list
*/
protected List<UUID> extractSnapshotIds(ResourceList resourceList) {
return resourceList.getResources().stream()
.map(this::safeGetSnapshotId)
.filter(Objects::nonNull)
.distinct()
.toList();
}

/**
* Get the full list of all snapshot references for the current workspace. WSM returns these
* results paginated; this method retrieves pages from WSM and aggregates the results.
*
* @param pageSize number of results to return from WSM at once
* @return the full list of all snapshot references for the workspace.
*/
protected ResourceList listAllSnapshots(int pageSize) {
final AtomicInteger offset = new AtomicInteger(0);
calypsomatic marked this conversation as resolved.
Show resolved Hide resolved
final int hardLimit =
10000; // under no circumstances return more than this many snapshots from WSM

ResourceList finalList = new ResourceList(); // collect our results

while (offset.get() < hardLimit) {
// get a page of results from WSM
RestClientRetry.RestCall<ResourceList> restCall =
(() -> wsmDao.enumerateDataRepoSnapshotReferences(workspaceId, offset.get(), pageSize));
ResourceList thisPage =
restClientRetry.withRetryAndErrorHandling(
restCall, "WSM.enumerateDataRepoSnapshotReferences");

// add this page of results to our collector
finalList.getResources().addAll(thisPage.getResources());

if (thisPage.getResources().size() < pageSize) {
// fewer results from WSM than we requested; this is the last page of results
return finalList;
} else {
// bump our offset and request another page of results
offset.addAndGet(pageSize);
}
}

throw new PfbImportException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any sensible way to fail faster if we'll exceed the hard limit? Maybe by pushing the hard limit enforcement down to WSM?

Feels like this approach ends up doing a whole bunch of wasted work and would potentially fail slowly and cause resource waste/exhaustion when paired with some form of retries.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other approach I can think of is to pick a number (maybe not as high as 10000), and make a single request to WSM for that many resources - don't attempt to paginate. Do you think that would be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if by your guess this is a really really unlikely scenario that it's maybe not something to worry about and this might be the perfect safeguard...wait till it happens, then worry about it.

I don't think I understand well enough how these snapshots get written to WSM, but the only thing that I think might be better is if we could somehow head them off at write time (into WSM) rather than here at read time.

"Exceeded hard limit of %d for number of pre-existing snapshot references"
.formatted(hardLimit));
}

/**
* Given a ResourceDescription representing a snapshot reference, retrieve that snapshot's UUID.
*
* @param resourceDescription the WSM object in which to find a snapshotId
* @return the snapshotId if found, else null
*/
protected UUID safeGetSnapshotId(ResourceDescription resourceDescription) {
ResourceAttributesUnion resourceAttributes = resourceDescription.getResourceAttributes();
if (resourceAttributes != null) {
DataRepoSnapshotAttributes dataRepoSnapshot = resourceAttributes.getGcpDataRepoSnapshot();
if (dataRepoSnapshot != null) {
String snapshotIdStr = dataRepoSnapshot.getSnapshot();
try {
return UUID.fromString(snapshotIdStr);
} catch (Exception e) {
// noop; this will return null
davidangb marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible place for an Optional?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same pushback here - as a non-public method (it's only protected instead of private so I can unit-test it), I'm optimizing for performance instead of a good API - this one is also used directly as a stream mapping function

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.databiosphere.workspacedataservice.service.model.exception;

import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ResponseStatus;

@ResponseStatus(code = HttpStatus.INTERNAL_SERVER_ERROR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way PfbImportException gets thrown (for blowing the hard limit), and the fact that it subclasses IllegalArgumentException make it seem more like some kind of client error than an internal error to me. Should this be a different HTTP status?

That said, I think we'd want to know about these early on as we learn how this feature gets used, so if it makes more sense that it's an 'us' problem than a client problem, then you can keep it as INTERNAL_SERVER_ERROR...I just wonder if it should subclass a different kind of exception in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of IllegalArgumentException here was a copy/paste mistake on my part, I have changed to extend RuntimeException!

I do consider this an "us" problem. Even though I think it highly unlikely that a workspace will have > 10000 snapshot references, it's our fault that we can't handle such a case, so I agree on keeping it as INTERNAL_SERVER_ERROR.

fwiw, I originally wrote the loop inside listAllSnapshots as a while(true) loop, but I got nervous about infinity and decided to add a sanity check and thus imposed a hard limit.

public class PfbImportException extends IllegalArgumentException {

public PfbImportException(String message) {
super(message);
}

public PfbImportException(String message, Exception e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ public void createDataRepoSnapshotReference(SnapshotModel snapshotModel) {
}
}

public ResourceList enumerateDataRepoSnapshotReferences(UUID workspaceId, int offset, int limit)
throws ApiException {
// get a page of results from WSM
return enumerateResources(
workspaceId, offset, limit, ResourceType.DATA_REPO_SNAPSHOT, StewardshipType.REFERENCED);
}

/** Retrieves the azure storage container url and sas token for a given workspace. */
public String getBlobStorageUrl(String storageWorkspaceId, String authToken) {
final ResourceApi resourceApi = this.workspaceManagerClientFactory.getResourceApi(authToken);
final ControlledAzureResourceApi azureResourceApi =
this.workspaceManagerClientFactory.getAzureResourceApi(authToken);
int count = 0;
Expand All @@ -59,8 +65,7 @@ public String getBlobStorageUrl(String storageWorkspaceId, String authToken) {
LOGGER.debug(
"Finding storage resource for workspace {} from Workspace Manager ...", workspaceUUID);
ResourceList resourceList =
resourceApi.enumerateResources(
workspaceUUID, 0, 5, ResourceType.AZURE_STORAGE_CONTAINER, null);
enumerateResources(workspaceUUID, 0, 5, ResourceType.AZURE_STORAGE_CONTAINER, null);
// note: it is possible a workspace may have more than one storage container associated with
// it
// but currently there is no way to tell which one is the primary except for checking the
Expand Down Expand Up @@ -94,4 +99,17 @@ public UUID extractResourceId(ResourceList resourceList, String storageWorkspace
}
return null;
}

private ResourceList enumerateResources(
UUID workspaceId,
int offset,
int limit,
ResourceType resourceType,
StewardshipType stewardshipType)
throws ApiException {
ResourceApi resourceApi = this.workspaceManagerClientFactory.getResourceApi(null);
// TODO: retries
return resourceApi.enumerateResources(
workspaceId, offset, limit, resourceType, stewardshipType);
}
}
Loading
Loading