Skip to content

Commit

Permalink
AJ-1452: pfb data types simpler (#416)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb authored Nov 20, 2023
1 parent 1df6ce9 commit 3ceaa87
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 26 deletions.
2 changes: 1 addition & 1 deletion service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ dependencies {
implementation 'com.squareup.okhttp3:okhttp:4.11.0' // required by Sam client
implementation "bio.terra:datarepo-client:1.537.0-SNAPSHOT"
implementation "bio.terra:workspace-manager-client:0.254.717-SNAPSHOT"
implementation "bio.terra:java-pfb-library:0.13.0"
implementation "bio.terra:java-pfb-library:0.14.0"
implementation project(path: ':client')

// hk2 is required to use WSM client, but not correctly exposed by the client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ protected void executeInternal(UUID jobId, JobExecutionContext context) {
// workspace to the snapshot for each of those snapshot ids.
// This will throw an exception if there are policy conflicts between the workspace
// and the snapshots.
// TODO AJ-1452: can this pass also identify all schemas/datatypes?
// TODO AJ-1452: can this pass also check if record types are contiguous?
//
// This is HTTP connection #1 to the PFB.
logger.info("Finding snapshots in this PFB...");
Set<UUID> snapshotIds = withPfbStream(url, this::findSnapshots);

logger.info("Linking snapshots...");
linkSnapshots(snapshotIds);

// Import all the tables and rows inside the PFB.
//
// This is HTTP connection #2 to the PFB.
logger.info("Importing tables and rows from this PFB...");
withPfbStream(url, stream -> importTables(stream, targetInstance));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.databiosphere.workspacedataservice.dataimport;

import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.databiosphere.workspacedataservice.shared.model.Record;
import org.databiosphere.workspacedataservice.shared.model.RecordAttributes;
Expand All @@ -15,13 +18,14 @@ public class PfbRecordConverter {
public static final String OBJECT_FIELD = "object";

public Record genericRecordToRecord(GenericRecord genRec) {
// create the WDS record shell (id, record type, empty attributes)
Record converted =
new Record(
genRec.get(ID_FIELD).toString(),
RecordType.valueOf(genRec.get(TYPE_FIELD).toString()),
RecordAttributes.empty());

// contains attributes
// loop over all Avro fields and add to the record's attributes
if (genRec.get(OBJECT_FIELD) instanceof GenericRecord objectAttributes) {
Schema schema = objectAttributes.getSchema();
List<Schema.Field> fields = schema.getFields();
Expand All @@ -40,16 +44,51 @@ public Record genericRecordToRecord(GenericRecord genRec) {
return converted;
}

// TODO AJ-1452: respect the datatypes returned by the PFB. For now, we make no guarantee that
// about datatypes; many values are just toString()-ed. This allows us to commit incremental
// progress and save some complicated work for later.
Object convertAttributeType(Object attribute) {

if (attribute == null) {
return null;
}
if (attribute instanceof Long /*or other number*/) {
return attribute;

// Avro numbers - see
// https://avro.apache.org/docs/current/api/java/org/apache/avro/generic/package-summary.html#package_description
if (attribute instanceof Long longAttr) {
return BigDecimal.valueOf(longAttr);
}
if (attribute instanceof Integer intAttr) {
return BigDecimal.valueOf(intAttr);
}
if (attribute instanceof Float floatAttr) {
return BigDecimal.valueOf(floatAttr);
}
if (attribute instanceof Double doubleAttr) {
return BigDecimal.valueOf(doubleAttr);
}

// Avro booleans
if (attribute instanceof Boolean boolAttr) {
return boolAttr;
}
return attribute.toString(); // easier for the datatype inferer to parse

// Avro enums
if (attribute instanceof GenericEnumSymbol<?> enumAttr) {
// TODO AJ-1479: decode enums using PfbReader.convertEnum
return enumAttr.toString();
}

// Avro arrays
if (attribute instanceof Collection<?> collAttr) {
// recurse
return collAttr.stream().map(this::convertAttributeType).toList();
}

// TODO AJ-1478: handle remaining possible Avro datatypes:
// Avro bytes are implemented as ByteBuffer. toString() these?
// Avro fixed are implemented as GenericFixed. toString() these?
// Avro maps are implemented as Map. Can we make these into WDS json?
// Avro records are implemented as GenericRecord. Can we make these into WDS json?

// for now, everything else is a String
return attribute.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ private BatchWriteResult consumeWriteStream(
// loop over all record types in this batch. For each record type, iff this is the first
// time we've seen this type, calculate a schema from its records and update the record type
// as necessary. Then, write the records into the table.
// TODO AJ-1452: for PFB imports, get schema from Avro, not from attribute values inference
for (RecordType recType : groupedRecords.keySet()) {
List<Record> rList = groupedRecords.get(recType).asList();
// have we already processed at least one batch of this record type?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class PfbStreamWriteHandler implements StreamingWriteHandler {

private final DataFileStream<GenericRecord> inputStream;

/**
* Create a new PfbStreamWriteHandler and specify the expected schemas for the PFB.
*
* @param inputStream the PFB stream
*/
public PfbStreamWriteHandler(DataFileStream<GenericRecord> inputStream) {
this.inputStream = inputStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.RandomStringUtils;
import org.databiosphere.workspacedataservice.shared.model.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class PfbRecordConverterTest {

Expand Down Expand Up @@ -48,40 +53,178 @@ void nullObjectAttributes() {
assertThat(actual.attributeSet()).isEmpty();
}

// if the GenericRecord has attributes in the "object" column, the WDS record should have the same
// attributes
// "smoke test" unit test: given an Avro GenericRecord containing a variety of values,
// convert to a WDS record and assert correctness of that WDS record.
@Test
void valuesInObjectAttributes() {
Schema enumSchema =
Schema.createEnum("name", "doc", "namespace", List.of("enumValue1", "enumValue2"));

Schema myObjSchema =
Schema.createRecord(
"objectSchema",
"mytype",
"doc",
"namespace",
false,
List.of(
new Schema.Field("marco", Schema.create(Schema.Type.STRING)),
new Schema.Field("pi", Schema.create(Schema.Type.LONG)),
new Schema.Field("afile", Schema.create(Schema.Type.STRING))));
new Schema.Field("afile", Schema.create(Schema.Type.STRING)),
new Schema.Field("booly", Schema.create(Schema.Type.BOOLEAN)),
new Schema.Field("enum", enumSchema),
new Schema.Field(
"arrayOfNumbers", Schema.createArray(Schema.create(Schema.Type.LONG))),
new Schema.Field(
"arrayOfStrings", Schema.createArray(Schema.create(Schema.Type.STRING))),
new Schema.Field("arrayOfEnums", Schema.createArray(enumSchema))));

GenericData.Record objectAttributes = new GenericData.Record(myObjSchema);
objectAttributes.put("marco", "polo");
objectAttributes.put("pi", 3.14159);
objectAttributes.put("afile", "https://some/path/to/a/file");

GenericRecord input =
PfbTestUtils.makeRecord("this-record-has", "a-null-for-the-object-field", objectAttributes);
objectAttributes.put("booly", Boolean.TRUE);
objectAttributes.put(
"enum", new GenericData.EnumSymbol(Schema.create(Schema.Type.STRING), "enumValue2"));
objectAttributes.put("arrayOfNumbers", List.of(1.2, 3.4));
objectAttributes.put("arrayOfStrings", List.of("one", "two", "three"));
objectAttributes.put(
"arrayOfEnums",
List.of(
new GenericData.EnumSymbol(Schema.create(Schema.Type.STRING), "enumValue2"),
new GenericData.EnumSymbol(Schema.create(Schema.Type.STRING), "enumValue1")));

GenericRecord input = PfbTestUtils.makeRecord("my-id", "mytype", objectAttributes);
Record actual = new PfbRecordConverter().genericRecordToRecord(input);

Set<Map.Entry<String, Object>> actualAttributeSet = actual.attributeSet();
Set<String> actualKeySet =
actualAttributeSet.stream().map(Map.Entry::getKey).collect(Collectors.toSet());
assertEquals(Set.of("marco", "pi", "afile"), actualKeySet);
assertEquals(
Set.of(
"marco",
"pi",
"afile",
"booly",
"enum",
"arrayOfNumbers",
"arrayOfStrings",
"arrayOfEnums"),
actualKeySet);

assertEquals("polo", actual.getAttributeValue("marco"));
assertEquals("https://some/path/to/a/file", actual.getAttributeValue("afile"));
// TODO AJ-1452: this should remain a number, not become a string
assertEquals("3.14159", actual.getAttributeValue("pi"));
assertEquals(BigDecimal.valueOf(3.14159), actual.getAttributeValue("pi"));
assertEquals(Boolean.TRUE, actual.getAttributeValue("booly"));
assertEquals("enumValue2", actual.getAttributeValue("enum"));
assertEquals(
List.of(BigDecimal.valueOf(1.2), BigDecimal.valueOf(3.4)),
actual.getAttributeValue("arrayOfNumbers"));
assertEquals(List.of("one", "two", "three"), actual.getAttributeValue("arrayOfStrings"));
assertEquals(List.of("enumValue2", "enumValue1"), actual.getAttributeValue("arrayOfEnums"));
}

// arguments for parameterized test, in the form of: input value, expected return value
static Stream<Arguments> provideConvertScalarAttributesArgs() {
return Stream.of(
// most basic case
Arguments.of("hello", "hello"),
// null inputs
Arguments.of(null, null),
// numbers
Arguments.of(Long.MIN_VALUE, BigDecimal.valueOf(Long.MIN_VALUE)),
Arguments.of(Long.MAX_VALUE, BigDecimal.valueOf(Long.MAX_VALUE)),
Arguments.of(Integer.MIN_VALUE, BigDecimal.valueOf(Integer.MIN_VALUE)),
Arguments.of(Integer.MAX_VALUE, BigDecimal.valueOf(Integer.MAX_VALUE)),
Arguments.of(Float.MIN_VALUE, BigDecimal.valueOf(Float.MIN_VALUE)),
Arguments.of(Float.MAX_VALUE, BigDecimal.valueOf(Float.MAX_VALUE)),
Arguments.of(Double.MIN_VALUE, BigDecimal.valueOf(Double.MIN_VALUE)),
Arguments.of(Double.MAX_VALUE, BigDecimal.valueOf(Double.MAX_VALUE)),
// booleans
Arguments.of(true, true),
Arguments.of(false, false));
}

// targeted test for converting scalar Avro values to WDS values
@ParameterizedTest(name = "with input of {0}, return value should be {2}")
@MethodSource("provideConvertScalarAttributesArgs")
void convertScalarAttributes(Object input, Object expected) {
PfbRecordConverter pfbRecordConverter = new PfbRecordConverter();

Object actual = pfbRecordConverter.convertAttributeType(input);
assertEquals(expected, actual);
}

// targeted test for converting scalar Avro enums to WDS values
@Test
void convertScalarEnums() {
PfbRecordConverter pfbRecordConverter = new PfbRecordConverter();

Object input = new GenericData.EnumSymbol(Schema.create(Schema.Type.STRING), "bar");

Object actual = pfbRecordConverter.convertAttributeType(input);
assertEquals("bar", actual);
}

// arguments for parameterized test, in the form of: input value, expected return value
static Stream<Arguments> provideConvertArrayAttributesArgs() {
return Stream.of(
// most basic case
Arguments.of(List.of("hello", "world"), List.of("hello", "world")),
// null inputs
Arguments.of(null, null),
// empty arrays
Arguments.of(List.of(), List.of()),
// numbers
Arguments.of(
List.of(Long.MIN_VALUE, 1L, Long.MAX_VALUE),
List.of(
BigDecimal.valueOf(Long.MIN_VALUE),
BigDecimal.valueOf(1L),
BigDecimal.valueOf(Long.MAX_VALUE))),
Arguments.of(
List.of(Integer.MIN_VALUE, 1, Integer.MAX_VALUE),
List.of(
BigDecimal.valueOf(Integer.MIN_VALUE),
BigDecimal.valueOf(1),
BigDecimal.valueOf(Integer.MAX_VALUE))),
Arguments.of(
List.of(Float.MIN_VALUE, 1F, Float.MAX_VALUE),
List.of(
BigDecimal.valueOf(Float.MIN_VALUE),
BigDecimal.valueOf(1F),
BigDecimal.valueOf(Float.MAX_VALUE))),
Arguments.of(
List.of(Double.MIN_VALUE, 1D, Double.MAX_VALUE),
List.of(
BigDecimal.valueOf(Double.MIN_VALUE),
BigDecimal.valueOf(1D),
BigDecimal.valueOf(Double.MAX_VALUE))),
// booleans
Arguments.of(List.of(true, false, true), List.of(true, false, true)));
}

// targeted test for converting array Avro values to WDS values
@ParameterizedTest(name = "with array input of {0}, return value should be {2}")
@MethodSource("provideConvertArrayAttributesArgs")
void convertArrayAttributes(Object input, Object expected) {
PfbRecordConverter pfbRecordConverter = new PfbRecordConverter();

Object actual = pfbRecordConverter.convertAttributeType(input);
assertEquals(expected, actual);
}

// targeted test for converting array Avro enums to WDS values
@Test
void convertArrayOfEnums() {
PfbRecordConverter pfbRecordConverter = new PfbRecordConverter();

Object input =
List.of(
new GenericData.EnumSymbol(Schema.create(Schema.Type.STRING), "bar"),
new GenericData.EnumSymbol(Schema.create(Schema.Type.STRING), "foo"),
new GenericData.EnumSymbol(Schema.create(Schema.Type.STRING), "baz"));

// TODO AJ-1452: add more test coverage as the runtime functionality evolves.
Object actual = pfbRecordConverter.convertAttributeType(input);
assertEquals(List.of("bar", "foo", "baz"), actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ void schemaInferredOnceForEachRecordType() throws IOException {
eq(INSTANCE), eq(RecordType.valueOf("widget")), any(), any(), eq(primaryKey.get()));

// but we should only have inferred schemas three times - once for each record Type
// TODO AJ-1452: this call to `verify` may change significantly as part of AJ-1452; reassess
// during that implementation.
@SuppressWarnings("unchecked")
ArgumentCaptor<List<Record>> argumentCaptor = ArgumentCaptor.forClass(List.class);
verify(inferer, times(3)).inferTypes(argumentCaptor.capture());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import bio.terra.pfb.PfbReader;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.util.List;
import org.apache.avro.file.DataFileStream;
Expand Down Expand Up @@ -131,8 +132,13 @@ void pfbTablesAreParsedCorrectly() {
assertEquals("HG01101_cram", firstRecord.getId());
assertEquals(RecordType.valueOf("submitted_aligned_reads"), firstRecord.getRecordType());
assertEquals(19, firstRecord.attributeSet().size());
// TODO AJ-1452: when PFB parsing properly respects datatypes, add assertions here, or
// better yet write more tests for datatype/attribute handling.

// smoke-test a few values
assertEquals(BigDecimal.valueOf(512), firstRecord.getAttributeValue("file_size"));
assertEquals("registered", firstRecord.getAttributeValue("file_state"));
assertEquals(
"drs://example.org/dg.4503/cc32d93d-a73c-4d2c-a061-26c0410e74fa",
firstRecord.getAttributeValue("ga4gh_drs_uri"));

Record secondRecord = result.get(1);
assertNotNull(secondRecord);
Expand Down

0 comments on commit 3ceaa87

Please sign in to comment.