Skip to content

Commit

Permalink
[Coral-Schema] Fix incorrect type derivation for repeated field refer…
Browse files Browse the repository at this point in the history
…ence on UDF calls (linkedin#510)

* fix field reference bug in coral schema

* bring back override tag

* address nits
  • Loading branch information
KevinGe00 committed Jul 31, 2024
1 parent 0537b6c commit fe40ec3
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ public static RelDataType convert(StructTypeInfo structType, final RelDataTypeFa
// The schema of output Struct conforms to https://github.com/trinodb/trino/pull/3483
// except we adopted "integer" for the type of "tag" field instead of "tinyint" in the Trino patch
// for compatibility with other platforms that Iceberg currently doesn't support tinyint type.
// When the field count inside UnionTypeInfo is one, we surface the underlying RelDataType instead.

// Note: this is subject to change in the future pending on the discussion in
// https://mail-archives.apache.org/mod_mbox/iceberg-dev/202112.mbox/browser
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2019-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -495,48 +495,58 @@ public RexNode visitRangeRef(RexRangeRef rexRangeRef) {
public RexNode visitFieldAccess(RexFieldAccess rexFieldAccess) {
RexNode referenceExpr = rexFieldAccess.getReferenceExpr();

if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator() instanceof SqlUserDefinedFunction) {
String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);

RelDataType fieldType = rexFieldAccess.getType();
boolean isNullable = SchemaUtilities.isFieldNullable((RexCall) referenceExpr, inputSchema);
// TODO: add field documentation
SchemaUtilities.appendField(newFieldName, fieldType, null, fieldAssembler, isNullable);
} else {
Deque<String> innerRecordNames = new LinkedList<>();
while (!(referenceExpr instanceof RexInputRef)) {
if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator().getName().equalsIgnoreCase("ITEM")) {
// While selecting `int_field` from `array_col:array<struct<int_field:int>>` using `array_col[x].int_field`,
// `rexFieldAccess` is like `ITEM($1, 1).int_field`, we need to set `referenceExpr` to be the first operand (`$1`) of `ITEM` function
referenceExpr = ((RexCall) referenceExpr).getOperands().get(0);
} else if (referenceExpr instanceof RexFieldAccess) {
// While selecting `int_field` from `struct_col:struct<inner_struct_col:struct<int_field:int>>` using `struct_col.inner_struct_col.int_field`,
// `rexFieldAccess` is like `$3.inner_struct_col.int_field`, we need to set `referenceExpr` to be the expr (`$3`) of itself.
// Besides, we need to store the field name (`inner_struct_col`) in `fieldNames` so that we can retrieve the correct inner struct from `topSchema` afterwards
innerRecordNames.push(((RexFieldAccess) referenceExpr).getField().getName());
referenceExpr = ((RexFieldAccess) referenceExpr).getReferenceExpr();
} else {
return super.visitFieldAccess(rexFieldAccess);
}
Deque<String> innerRecordNames = new LinkedList<>();
while (!(referenceExpr instanceof RexInputRef)) {
if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator().getName().equalsIgnoreCase("ITEM")) {
// While selecting `int_field` from `array_col:array<struct<int_field:int>>` using `array_col[x].int_field`,
// `rexFieldAccess` is like `ITEM($1, 1).int_field`, we need to set `referenceExpr` to be the first operand (`$1`) of `ITEM` function
referenceExpr = ((RexCall) referenceExpr).getOperands().get(0);
} else if (referenceExpr instanceof RexCall
&& ((RexCall) referenceExpr).getOperator() instanceof SqlUserDefinedFunction) {
// UDFs calls could potentially be doubly (or more) field-referenced, for example, `extract_union(baz).single.tag_0`
// where baz is a struct containing a uniontype field. In this case, we simply need to use derived type of the entire
// call. Note that this also takes care of the simple one layer field reference on a UDF call.
handleUDFFieldAccess(rexFieldAccess, (RexCall) referenceExpr);
return rexFieldAccess;
} else if (referenceExpr instanceof RexFieldAccess) {
// While selecting `int_field` from `struct_col:struct<inner_struct_col:struct<int_field:int>>` using `struct_col.inner_struct_col.int_field`,
// `rexFieldAccess` is like `$3.inner_struct_col.int_field`, we need to set `referenceExpr` to be the expr (`$3`) of itself.
// Besides, we need to store the field name (`inner_struct_col`) in `fieldNames` so that we can retrieve the correct inner struct from `topSchema` afterwards
innerRecordNames.push(((RexFieldAccess) referenceExpr).getField().getName());
referenceExpr = ((RexFieldAccess) referenceExpr).getReferenceExpr();
} else {
return super.visitFieldAccess(rexFieldAccess);
}

String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);
Schema topSchema = inputSchema.getFields().get(((RexInputRef) referenceExpr).getIndex()).schema();

Schema.Field accessedField = getFieldFromTopSchema(topSchema, oldFieldName, innerRecordNames);
assert accessedField != null;
SchemaUtilities.appendField(newFieldName, accessedField, fieldAssembler);
}

handleFieldAccess(rexFieldAccess, (RexInputRef) referenceExpr, innerRecordNames);
return rexFieldAccess;
}

private void handleFieldAccess(RexFieldAccess rexFieldAccess, RexInputRef referenceExpr,
Deque<String> innerRecordNames) {
String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);

Schema topSchema = inputSchema.getFields().get(referenceExpr.getIndex()).schema();
Schema.Field accessedField = getFieldFromTopSchema(topSchema, oldFieldName, innerRecordNames);
assert accessedField != null;
SchemaUtilities.appendField(newFieldName, accessedField, fieldAssembler);
}

private void handleUDFFieldAccess(RexFieldAccess rexFieldAccess, RexCall referenceExpr) {
String oldFieldName = rexFieldAccess.getField().getName();
String suggestNewFieldName = suggestedFieldNames.poll();
String newFieldName = SchemaUtilities.getFieldName(oldFieldName, suggestNewFieldName);

RelDataType fieldType = rexFieldAccess.getType();
boolean isNullable = SchemaUtilities.isFieldNullable(referenceExpr, inputSchema);
// TODO: add field documentation
SchemaUtilities.appendField(newFieldName, fieldType, null, fieldAssembler, isNullable);
}

@Override
public RexNode visitSubQuery(RexSubQuery rexSubQuery) {
// TODO: implement this method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ private static void initializeTables() {

executeQuery("DROP TABLE IF EXISTS basedecimal");
executeQuery("CREATE TABLE IF NOT EXISTS basedecimal(decimal_col decimal(2,1))");

executeQuery(
"CREATE TABLE IF NOT EXISTS single_uniontypes(single uniontype<string>, struct_col struct<single:uniontype<string>>)");
}

private static void initializeUdfs() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2019-2024 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -560,6 +560,17 @@ public void testNullabliltyExtractUnionUDF() {
Assert.assertEquals(actual.toString(true), TestUtils.loadSchema("testNullabilityExtractUnionUDF-expected.avsc"));
}

@Test
public void testSingleUnionFieldReference() {
String sql =
"select extract_union(struct_col).single.tag_0 as single_in_struct, extract_union(single).tag_0 as single from single_uniontypes";
ViewToAvroSchemaConverter viewToAvroSchemaConverter = ViewToAvroSchemaConverter.create(hiveMetastoreClient);

Schema actual = viewToAvroSchemaConverter.toAvroSchema(sql);

Assert.assertEquals(actual.toString(true), TestUtils.loadSchema("testSingleUnionFieldReference-expected.avsc"));
}

@Test(enabled = false)
public void testRenameToLowercase() {
String viewSql = "CREATE VIEW v AS " + "SELECT bc.Id AS id, bc.Array_Col AS array_col " + "FROM basecomplex bc "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type" : "record",
"name" : "SingleUniontypes",
"namespace" : "default.single_uniontypes",
"fields" : [ {
"name" : "single_in_struct",
"type" : [ "null", "string" ]
}, {
"name" : "single",
"type" : [ "null", "string" ]
} ]
}

0 comments on commit fe40ec3

Please sign in to comment.