Skip to content

Commit

Permalink
[codegen] EqualiserCodeGenerator supports ARRAY<ROW> (apache#3023)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Mar 15, 2024
1 parent 8a0aec6 commit 35acc4c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.paimon.codegen

import org.apache.paimon.codegen.GenerateUtils._
import org.apache.paimon.codegen.ScalarOperatorGens.generateEquals
import org.apache.paimon.codegen.ScalarOperatorGens.{generateEquals, generateRowEqualiser}
import org.apache.paimon.types.{BooleanType, DataType, RowType}
import org.apache.paimon.types.DataTypeChecks.{getFieldTypes, isCompositeType}
import org.apache.paimon.types.DataTypeChecks.isCompositeType
import org.apache.paimon.types.DataTypeRoot._
import org.apache.paimon.utils.TypeUtils.isPrimitive

Expand Down Expand Up @@ -136,19 +136,7 @@ class EqualiserCodeGenerator(fieldTypes: Array[DataType]) {
if (isInternalPrimitive(fieldType)) {
("", s"$leftFieldTerm == $rightFieldTerm")
} else if (isCompositeType(fieldType)) {
val equaliserGenerator =
new EqualiserCodeGenerator(getFieldTypes(fieldType).asScala.toArray)
val generatedEqualiser = equaliserGenerator.generateRecordEqualiser("fieldGeneratedEqualiser")
val generatedEqualiserTerm =
ctx.addReusableObject(generatedEqualiser, "fieldGeneratedEqualiser")
val equaliserTypeTerm = classOf[RecordEqualiser].getCanonicalName
val equaliserTerm = newName("equaliser")
ctx.addReusableMember(s"private $equaliserTypeTerm $equaliserTerm = null;")
ctx.addReusableInitStatement(
s"""
|$equaliserTerm = ($equaliserTypeTerm)
| $generatedEqualiserTerm.newInstance(this.getClass().getClassLoader());
|""".stripMargin)
val equaliserTerm = generateRowEqualiser(ctx, fieldType)
("", s"$equaliserTerm.equals($leftFieldTerm, $rightFieldTerm)")
} else {
val left = GeneratedExpression(leftFieldTerm, leftNullTerm, "", fieldType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ package org.apache.paimon.codegen
import org.apache.paimon.codegen.GenerateUtils._
import org.apache.paimon.data.serializer.InternalMapSerializer
import org.apache.paimon.types._
import org.apache.paimon.types.DataTypeChecks.{getFieldTypes, isCompositeType}
import org.apache.paimon.utils.InternalRowUtils
import org.apache.paimon.utils.TypeCheckUtils._
import org.apache.paimon.utils.TypeUtils.isInteroperable

import scala.collection.JavaConverters._

/**
* Utilities to generate SQL scalar operators, e.g. arithmetic operator, compare operator, equal
* operator, etc.
Expand Down Expand Up @@ -78,6 +81,10 @@ object ScalarOperatorGens {
// comparable types of same type
else if (isComparable(left.resultType) && canEqual) {
generateComparison(ctx, "==", left, right, resultType)
} else if (isCompositeType(left.resultType) && canEqual) {
val equaliserTerm = generateRowEqualiser(ctx, left.resultType)
generateOperatorIfNotNull(ctx, resultType, left, right)(
(leftTerm, rightTerm) => s"$equaliserTerm.equals($leftTerm, $rightTerm)")
}
// non comparable types
else {
Expand All @@ -95,6 +102,25 @@ object ScalarOperatorGens {
}
}

/** Generates [[RecordEqualiser]] code for row and return equaliser name. */
def generateRowEqualiser(ctx: CodeGeneratorContext, fieldType: DataType): String = {
val equaliserGenerator =
new EqualiserCodeGenerator(getFieldTypes(fieldType).asScala.toArray)
val generatedEqualiser =
equaliserGenerator.generateRecordEqualiser("fieldGeneratedEqualiser")
val generatedEqualiserTerm =
ctx.addReusableObject(generatedEqualiser, "fieldGeneratedEqualiser")
val equaliserTypeTerm = classOf[RecordEqualiser].getCanonicalName
val equaliserTerm = newName("equaliser")
ctx.addReusableMember(s"private $equaliserTypeTerm $equaliserTerm = null;")
ctx.addReusableInitStatement(
s"""
|$equaliserTerm = ($equaliserTypeTerm)
| $generatedEqualiserTerm.newInstance(this.getClass().getClassLoader());
|""".stripMargin)
equaliserTerm
}

/** Generates comparison code for numeric types and comparable types of same type. */
def generateComparison(
ctx: CodeGeneratorContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,33 @@ public void testUpdateAuditLog() throws Exception {
assertThat(sql("SELECT * FROM %s$audit_log", table))
.containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "+I", "1", "4", "5"));
}

@Test
public void testRowDeduplicateWithArrayRow() throws Exception {
String table = "T_ARRAY_ROW";
tEnv.executeSql(
"CREATE TABLE IF NOT EXISTS "
+ table
+ "("
+ "ID INT PRIMARY KEY NOT ENFORCED,\n"
+ "NAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n"
+ ") WITH ("
+ "'changelog-producer'='full-compaction',"
+ "'changelog-producer.compaction-interval' = '1s',"
+ "'changelog-producer.row-deduplicate' = 'true')");
BlockingIterator<Row, Row> iterator =
BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));

sql("INSERT INTO %s VALUES (1, ARRAY[('a','mark1')]);", table);
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
.containsExactlyInAnyOrder("+I[1, [+I[a, mark1]]]");

sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')])", table);
assertThat(iterator.collect(2).stream().map(Row::toString).collect(Collectors.toList()))
.containsExactly("-U[1, [+I[a, mark1]]]", "+U[1, [+I[b, mark2]]]");

sql("INSERT INTO %s VALUES (1, ARRAY[('b', 'mark2')]), (2, ARRAY[('c', 'mark3')])", table);
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
.containsExactly("+I[2, [+I[c, mark3]]]");
}
}

0 comments on commit 35acc4c

Please sign in to comment.