Skip to content

Commit

Permalink
AJ-1227: parse incoming PFBs into tables (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
calypsomatic authored Nov 14, 2023
1 parent 0def2c7 commit 39538ba
Show file tree
Hide file tree
Showing 16 changed files with 1,081 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
package org.databiosphere.workspacedataservice.dataimport;

import static org.databiosphere.workspacedataservice.dataimport.PfbRecordConverter.ID_FIELD;
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_INSTANCE;
import static org.databiosphere.workspacedataservice.shared.model.Schedulable.ARG_URL;

import bio.terra.datarepo.model.SnapshotModel;
import bio.terra.pfb.PfbReader;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.databiosphere.workspacedataservice.activitylog.ActivityLogger;
import org.databiosphere.workspacedataservice.dao.JobDao;
import org.databiosphere.workspacedataservice.jobexec.QuartzJob;
import org.databiosphere.workspacedataservice.retry.RestClientRetry;
import org.databiosphere.workspacedataservice.service.BatchWriteService;
import org.databiosphere.workspacedataservice.service.model.BatchWriteResult;
import org.databiosphere.workspacedataservice.service.model.exception.PfbParsingException;
import org.databiosphere.workspacedataservice.service.model.exception.RestException;
import org.databiosphere.workspacedataservice.shared.model.RecordType;
import org.databiosphere.workspacedataservice.workspacemanager.WorkspaceManagerDao;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
Expand All @@ -37,20 +46,24 @@ public class PfbQuartzJob extends QuartzJob {

private final JobDao jobDao;
private final WorkspaceManagerDao wsmDao;

private final BatchWriteService batchWriteService;
private final ActivityLogger activityLogger;
private final UUID workspaceId;

private final RestClientRetry restClientRetry;

public PfbQuartzJob(
JobDao jobDao,
WorkspaceManagerDao wsmDao,
RestClientRetry restClientRetry,
BatchWriteService batchWriteService,
ActivityLogger activityLogger,
@Value("${twds.instance.workspace-id}") UUID workspaceId) {
this.jobDao = jobDao;
this.wsmDao = wsmDao;
this.restClientRetry = restClientRetry;
this.workspaceId = workspaceId;
this.batchWriteService = batchWriteService;
this.activityLogger = activityLogger;
}

@Override
Expand All @@ -60,45 +73,108 @@ protected JobDao getJobDao() {

@Override
protected void executeInternal(UUID jobId, JobExecutionContext context) {
// Grab the PFB url from the job's data map
JobDataMap jobDataMap = context.getMergedJobDataMap();
URL url = getJobDataUrl(jobDataMap, ARG_URL);
UUID targetInstance = getJobDataUUID(jobDataMap, ARG_INSTANCE);

// Find all the snapshot ids in the PFB, then create or verify references from the
// workspace to the snapshot for each of those snapshot ids.
// This will throw an exception if there are policy conflicts between the workspace
// and the snapshots.
// TODO AJ-1452: can this pass also identify all schemas/datatypes?
// TODO AJ-1452: can this pass also check if record types are contiguous?
logger.info("Finding snapshots in this PFB...");
Set<UUID> snapshotIds = withPfbStream(url, this::findSnapshots);

logger.info("Linking snapshots...");
linkSnapshots(snapshotIds);

// Import all the tables and rows inside the PFB.
logger.info("Importing tables and rows from this PFB...");
withPfbStream(url, stream -> importTables(stream, targetInstance));

// TODO AJ-1453: save the result of importTables and persist the to the job
}

/**
* definition for some function that consumes a PFB stream (as a DataFileStream<GenericRecord>)
*/
@FunctionalInterface
public interface PfbStreamConsumer<T> {
T run(DataFileStream<GenericRecord> dataStream);
}

/**
* convenience wrapper function to execute a PfbStreamConsumer on a PFB at a given url, handling
* opening and closing of a DataFileStream for that PFB.
*
* @param url location of the PFB
* @param consumer code to execute against the PFB's contents
*/
<T> T withPfbStream(URL url, PfbStreamConsumer<T> consumer) {
try (DataFileStream<GenericRecord> dataStream =
PfbReader.getGenericRecordsStream(url.toString())) {
// translate the Avro DataFileStream into a Java stream
Stream<GenericRecord> recordStream =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataStream.iterator(), Spliterator.ORDERED),
false);

// process the stream into a list of unique snapshotIds
List<UUID> snapshotIds =
recordStream
.map(rec -> rec.get("object")) // Records in a pfb are stored under the key "object"
.filter(GenericRecord.class::isInstance) // which we expect to be a GenericRecord
.map(GenericRecord.class::cast)
.filter(
obj ->
obj.hasField(SNAPSHOT_ID_IDENTIFIER)) // avoid exception if field nonexistent
.map(obj -> obj.get(SNAPSHOT_ID_IDENTIFIER)) // within the GenericRecord, find the
// source_datarepo_snapshot_id
.filter(Objects::nonNull) // expect source_datarepo_snapshot_id to be non-null
.map(obj -> maybeUuid(obj.toString()))
.filter(Objects::nonNull)
.distinct() // find only the unique snapshotids
.toList();

// link the found snapshots to the workspace, skipping any that were previously linked
linkSnapshots(snapshotIds);

} catch (IOException e) {
return consumer.run(dataStream);
} catch (Exception e) {
throw new PfbParsingException("Error processing PFB", e);
}
}

/**
* Given a DataFileStream representing a PFB, import all the tables and rows inside that PFB.
*
* @param dataStream stream representing the PFB.
*/
BatchWriteResult importTables(DataFileStream<GenericRecord> dataStream, UUID targetInstance) {
BatchWriteResult result =
batchWriteService.batchWritePfbStream(dataStream, targetInstance, Optional.of(ID_FIELD));

result
.entrySet()
.forEach(
entry -> {
RecordType recordType = entry.getKey();
int quantity = entry.getValue();
activityLogger.saveEventForCurrentUser(
user -> user.upserted().record().withRecordType(recordType).ofQuantity(quantity));
});
return result;
}

// TODO: AJ-1227 implement PFB import.
logger.info("TODO: implement PFB import.");
/**
* Given a DataFileStream representing a PFB, find all the unique snapshot ids in the PFB by
* looking in the "source_datarepo_snapshot_id" column of each row in the PFB
*
* @param dataStream stream representing the PFB.
* @return unique UUIDs found in the PFB
*/
Set<UUID> findSnapshots(DataFileStream<GenericRecord> dataStream) {
// translate the Avro DataFileStream into a Java stream
Stream<GenericRecord> recordStream =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataStream.iterator(), Spliterator.ORDERED), false);

// process the stream into a list of unique snapshotIds
return recordStream
.map(rec -> rec.get("object")) // Records in a pfb are stored under the key "object"
.filter(GenericRecord.class::isInstance) // which we expect to be a GenericRecord
.map(GenericRecord.class::cast)
.filter(obj -> obj.hasField(SNAPSHOT_ID_IDENTIFIER)) // avoid exception if field nonexistent
.map(obj -> obj.get(SNAPSHOT_ID_IDENTIFIER)) // get the source_datarepo_snapshot_id value
.filter(Objects::nonNull) // expect source_datarepo_snapshot_id to be non-null
.map(obj -> maybeUuid(obj.toString()))
.filter(Objects::nonNull) // find only the unique snapshotids
.collect(Collectors.toSet());
}

protected void linkSnapshots(List<UUID> snapshotIds) {
/**
* Given a list of snapshot ids, create references from the workspace to the snapshot for each id
* that does not already have a reference.
*
* @param snapshotIds the list of snapshot ids to create or verify references.
*/
protected void linkSnapshots(Set<UUID> snapshotIds) {
// list existing snapshots linked to this workspace
TdrSnapshotSupport tdrSnapshotSupport =
new TdrSnapshotSupport(workspaceId, wsmDao, restClientRetry);
Expand All @@ -121,8 +197,8 @@ protected void linkSnapshots(List<UUID> snapshotIds) {
(() -> wsmDao.linkSnapshotForPolicy(new SnapshotModel().id(uuid)));
restClientRetry.withRetryAndErrorHandling(
voidRestCall, "WSM.createDataRepoSnapshotReference");
} catch (Exception e) {
throw new PfbParsingException("Error processing PFB: Invalid snapshot UUID", e);
} catch (RestException re) {
throw new PfbParsingException("Error processing PFB: " + re.getMessage(), re);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.databiosphere.workspacedataservice.dataimport;

import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.databiosphere.workspacedataservice.shared.model.Record;
import org.databiosphere.workspacedataservice.shared.model.RecordAttributes;
import org.databiosphere.workspacedataservice.shared.model.RecordType;

/** Logic to convert a PFB's GenericRecord to WDS's Record */
public class PfbRecordConverter {

public static final String ID_FIELD = "id";
public static final String TYPE_FIELD = "name";
public static final String OBJECT_FIELD = "object";

public Record genericRecordToRecord(GenericRecord genRec) {
Record converted =
new Record(
genRec.get(ID_FIELD).toString(),
RecordType.valueOf(genRec.get(TYPE_FIELD).toString()),
RecordAttributes.empty());

// contains attributes
if (genRec.get(OBJECT_FIELD) instanceof GenericRecord objectAttributes) {
Schema schema = objectAttributes.getSchema();
List<Schema.Field> fields = schema.getFields();
RecordAttributes attributes = RecordAttributes.empty();
for (Schema.Field field : fields) {
String fieldName = field.name();
Object value =
objectAttributes.get(fieldName) == null
? null
: convertAttributeType(objectAttributes.get(fieldName));
attributes.putAttribute(fieldName, value);
}
converted.setAttributes(attributes);
}

return converted;
}

// TODO AJ-1452: respect the datatypes returned by the PFB. For now, we make no guarantee that
// about datatypes; many values are just toString()-ed. This allows us to commit incremental
// progress and save some complicated work for later.
Object convertAttributeType(Object attribute) {
if (attribute == null) {
return null;
}
if (attribute instanceof Long /*or other number*/) {
return attribute;
}
return attribute.toString(); // easier for the datatype inferer to parse
}
}
Loading

0 comments on commit 39538ba

Please sign in to comment.