Skip to content

Commit

Permalink
feat(datahub-client): improve Avro schema conversions
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor committed Dec 16, 2024
1 parent f9251c8 commit 0b80492
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class AvroSchemaConverter implements SchemaConverter<Schema> {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Map<String, Supplier<SchemaFieldDataType.Type>> LOGICAL_TYPE_MAPPING;
public static final String ARRAY_ITEMS_FIELD_NAME = "items";
public static final String MAP_VALUE_FIELD_NAME = "value";

static {
Map<String, Supplier<SchemaFieldDataType.Type>> logicalTypeMap = new HashMap<>();
Expand Down Expand Up @@ -337,6 +339,18 @@ private void processArrayField(
// Set parent type for proper array handling
DataHubType arrayDataHubType = new DataHubType(ArrayType.class, elementType);

SchemaField arrayField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(arrayDataHubType.asSchemaFieldType())
.setNativeDataType("array(" + elementType + ")")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, arrayField);
log.debug("Array field path: {} with doc: {}", fieldPath.asString(), field.doc());
fields.add(arrayField);

// Process element type if it's complex
if (elementSchema.getType() == Schema.Type.RECORD
|| elementSchema.getType() == Schema.Type.ARRAY
Expand All @@ -349,25 +363,12 @@ private void processArrayField(
new FieldElement(Collections.singletonList("array"), new ArrayList<>(), null, null));
Schema.Field elementField =
new Schema.Field(
field.name(),
ARRAY_ITEMS_FIELD_NAME,
elementSchema,
elementSchema.getDoc() != null ? elementSchema.getDoc() : field.doc(),
null // TODO: What is the default value for an array element?
);
processField(elementField, fieldPath, defaultNullable, fields, isNullable, arrayDataHubType);
} else {

SchemaField arrayField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(arrayDataHubType.asSchemaFieldType())
.setNativeDataType("array(" + elementType + ")")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, arrayField);
log.debug("Array field path: {} with doc: {}", fieldPath.asString(), field.doc());
fields.add(arrayField);
processField(elementField, fieldPath, defaultNullable, fields, true, arrayDataHubType);
}
}

Expand All @@ -386,14 +387,25 @@ private void processMapField(
DataHubType mapDataHubType = new DataHubType(MapType.class, valueType);
fieldPath = fieldPath.expandType("map", mapSchema);

SchemaField mapField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(mapDataHubType.asSchemaFieldType())
.setNativeDataType("map<string," + valueType + ">")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, mapField);
fields.add(mapField);

// Process value type if it's complex
if (valueSchema.getType() == Schema.Type.RECORD
|| valueSchema.getType() == Schema.Type.ARRAY
|| valueSchema.getType() == Schema.Type.MAP
|| valueSchema.getType() == Schema.Type.UNION) {
Schema.Field valueField =
new Schema.Field(
field.name(),
MAP_VALUE_FIELD_NAME,
valueSchema,
valueSchema.getDoc() != null ? valueSchema.getDoc() : field.doc(),
null // TODO: What is the default value for a map value?
Expand All @@ -404,18 +416,7 @@ private void processMapField(
.clonePlus(
new FieldElement(
Collections.singletonList("map"), new ArrayList<>(), null, null));
processField(valueField, valueFieldPath, defaultNullable, fields, isNullable, mapDataHubType);
} else {
SchemaField mapField =
new SchemaField()
.setFieldPath(fieldPath.asString())
.setType(mapDataHubType.asSchemaFieldType())
.setNativeDataType("map<string," + valueType + ">")
.setNullable(isNullable || defaultNullable)
.setIsPartOfKey(fieldPath.isKeySchema());

populateCommonProperties(field, mapField);
fields.add(mapField);
processField(valueField, valueFieldPath, defaultNullable, fields, true, mapDataHubType);
}
}

Expand Down
Loading

0 comments on commit 0b80492

Please sign in to comment.