diff --git a/core/src/main/java/com/netease/arctic/utils/ExpressionUtil.java b/core/src/main/java/com/netease/arctic/utils/ExpressionUtil.java index 244e931e48..139b1a83fb 100644 --- a/core/src/main/java/com/netease/arctic/utils/ExpressionUtil.java +++ b/core/src/main/java/com/netease/arctic/utils/ExpressionUtil.java @@ -71,7 +71,11 @@ public static Expression convertPartitionDataToDataFilter( Class resultType = partitionField.transform().getResultType(sourceField.type()).typeId().javaClass(); Object partitionValue = partition.get(i, resultType); - filter = Expressions.and(filter, Expressions.equal(transform, partitionValue)); + if (partitionValue != null) { + filter = Expressions.and(filter, Expressions.equal(transform, partitionValue)); + } else { + filter = Expressions.and(filter, Expressions.isNull(transform)); + } } return filter; } diff --git a/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java b/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java index 6a51a00a46..35fa553c1b 100644 --- a/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java +++ b/core/src/test/java/com/netease/arctic/BasicTableTestHelper.java @@ -100,6 +100,15 @@ public BasicTableTestHelper(boolean hasPrimaryKey, PartitionSpec partitionSpec) buildTableFormat(DEFAULT_FILE_FORMAT_DEFAULT)); } + public BasicTableTestHelper( + Schema tableSchema, boolean hasPrimaryKey, PartitionSpec partitionSpec) { + this( + tableSchema, + hasPrimaryKey ? PRIMARY_KEY_SPEC : PrimaryKeySpec.noPrimaryKey(), + partitionSpec, + buildTableFormat(DEFAULT_FILE_FORMAT_DEFAULT)); + } + @Override public Schema tableSchema() { return tableSchema; diff --git a/core/src/test/java/com/netease/arctic/utils/TestKeyedExpressionUtil.java b/core/src/test/java/com/netease/arctic/utils/TestKeyedExpressionUtil.java index a50e3084a4..08a585bbc0 100644 --- a/core/src/test/java/com/netease/arctic/utils/TestKeyedExpressionUtil.java +++ b/core/src/test/java/com/netease/arctic/utils/TestKeyedExpressionUtil.java @@ -18,8 +18,6 @@ package com.netease.arctic.utils; -import static com.netease.arctic.BasicTableTestHelper.TABLE_SCHEMA; - import com.netease.arctic.BasicTableTestHelper; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.catalog.BasicCatalogTestHelper; @@ -29,11 +27,13 @@ import com.netease.arctic.scan.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -51,9 +51,16 @@ public class TestKeyedExpressionUtil extends TableTestBase { public TestKeyedExpressionUtil(PartitionSpec partitionSpec) { super( new BasicCatalogTestHelper(TableFormat.MIXED_ICEBERG), - new BasicTableTestHelper(true, partitionSpec)); + new BasicTableTestHelper(TABLE_SCHEMA, true, partitionSpec)); } + public static final Schema TABLE_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "ts", Types.LongType.get()), + Types.NestedField.optional(4, "op_time", Types.TimestampType.withoutZone())); + @Parameterized.Parameters(name = "{0}") public static Object[] parameters() { return new Object[][] { @@ -76,29 +83,25 @@ public void testKeyedConvertPartitionStructLikeToDataFilter() { // hash("111") = -210118348, hash("222") = -699778209 tableTestHelper().generateTestRecord(1, "111", 1, "2021-01-01T01:00:00"), tableTestHelper().generateTestRecord(2, "111", 1, "2021-01-01T01:00:00"), - tableTestHelper().generateTestRecord(3, "222", 11, "2022-02-02T02:00:00"), - tableTestHelper().generateTestRecord(4, "222", 11, "2022-02-02T02:00:00")); + tableTestHelper().generateTestRecord(3, "222", 11, null), + tableTestHelper().generateTestRecord(4, "222", 11, null)); ArrayList changeStoreRecords = Lists.newArrayList( tableTestHelper().generateTestRecord(5, "111", 1, "2021-01-01T01:00:00"), tableTestHelper().generateTestRecord(6, "111", 1, "2021-01-01T01:00:00"), - tableTestHelper().generateTestRecord(7, "222", 11, "2022-02-02T02:00:00"), - tableTestHelper().generateTestRecord(8, "222", 11, "2022-02-02T02:00:00")); + tableTestHelper().generateTestRecord(7, "222", 11, null), + tableTestHelper().generateTestRecord(8, "222", 11, null)); // 4 files List baseStoreFiles = MixedDataTestHelpers.writeAndCommitBaseStore(getArcticTable(), 1L, baseStoreRecords, true); - // identity: opTime=2021-01-01T01:00:00; bucket: id_bucket=0; - // truncate: ts_trunc=0; year: op_time_year=2021; month: op_time_month=2021-01; - // day: op_time_day=2021-01-01; hour: op_time_hour=2021-01-01-01 - DataFile sampleFile = baseStoreFiles.get(0); - MixedDataTestHelpers.writeAndCommitChangeStore( getArcticTable().asKeyedTable(), 2L, ChangeAction.INSERT, changeStoreRecords, true); - - Expression partitionFilter = - ExpressionUtil.convertPartitionDataToDataFilter( - getArcticTable(), sampleFile.specId(), Sets.newHashSet(sampleFile.partition())); - assertPlanHalfWithPartitionFilter(partitionFilter); + for (DataFile baseStoreFile : baseStoreFiles) { + Expression partitionFilter = + ExpressionUtil.convertPartitionDataToDataFilter( + getArcticTable(), baseStoreFile.specId(), Sets.newHashSet(baseStoreFile.partition())); + assertPlanHalfWithPartitionFilter(partitionFilter); + } } private void assertPlanHalfWithPartitionFilter(Expression partitionFilter) { @@ -118,6 +121,22 @@ private void assertPlanHalfWithPartitionFilter(Expression partitionFilter) { } catch (IOException e) { throw new RuntimeException(e); } + baseDataFiles.clear(); + insertFiles.clear(); + try (CloseableIterable it = + getArcticTable().asKeyedTable().newScan().planTasks()) { + it.forEach( + cst -> + cst.tasks() + .forEach( + t -> { + t.baseTasks().forEach(fileTask -> baseDataFiles.add(fileTask.file())); + t.insertTasks().forEach(fileTask -> insertFiles.add(fileTask.file())); + })); + } catch (IOException e) { + throw new RuntimeException(e); + } + Assert.assertEquals(4, baseDataFiles.size()); Assert.assertEquals(4, insertFiles.size()); baseDataFiles.clear(); diff --git a/core/src/test/java/com/netease/arctic/utils/TestUnkeyedExpressionUtil.java b/core/src/test/java/com/netease/arctic/utils/TestUnkeyedExpressionUtil.java index 2aa26b1979..81676e2303 100644 --- a/core/src/test/java/com/netease/arctic/utils/TestUnkeyedExpressionUtil.java +++ b/core/src/test/java/com/netease/arctic/utils/TestUnkeyedExpressionUtil.java @@ -18,23 +18,24 @@ package com.netease.arctic.utils; -import static com.netease.arctic.BasicTableTestHelper.TABLE_SCHEMA; - import com.netease.arctic.BasicTableTestHelper; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.catalog.BasicCatalogTestHelper; import com.netease.arctic.catalog.TableTestBase; import com.netease.arctic.io.IcebergDataTestHelpers; +import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.UnkeyedTable; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -49,7 +50,21 @@ public class TestUnkeyedExpressionUtil extends TableTestBase { public TestUnkeyedExpressionUtil(TableFormat tableFormat, PartitionSpec partitionSpec) { - super(new BasicCatalogTestHelper(tableFormat), new BasicTableTestHelper(false, partitionSpec)); + super( + new BasicCatalogTestHelper(tableFormat), + new BasicTableTestHelper(TABLE_SCHEMA, false, partitionSpec)); + } + + public static final Schema TABLE_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "ts", Types.LongType.get()), + Types.NestedField.optional(4, "op_time", Types.TimestampType.withoutZone())); + + @Override + protected ArcticTable getArcticTable() { + return super.getArcticTable(); } @Parameterized.Parameters(name = "{0}, {1}") @@ -88,8 +103,8 @@ public void testUnkeyedConvertPartitionStructLikeToDataFilter() throws IOExcepti // hash("111") = -210118348, hash("222") = -699778209 tableTestHelper().generateTestRecord(1, "111", 1, "2021-01-01T01:00:00"), tableTestHelper().generateTestRecord(2, "111", 1, "2021-01-01T01:00:00"), - tableTestHelper().generateTestRecord(3, "222", 11, "2022-02-02T02:00:00"), - tableTestHelper().generateTestRecord(4, "222", 11, "2022-02-02T02:00:00")); + tableTestHelper().generateTestRecord(3, "222", 11, null), + tableTestHelper().generateTestRecord(4, "222", 11, null)); // 2 files for partition table, 1 file for unpartition table DataFile[] dataFiles = IcebergDataTestHelpers.insert(table, records).dataFiles(); AppendFiles appendFiles = table.newAppend(); @@ -97,14 +112,12 @@ public void testUnkeyedConvertPartitionStructLikeToDataFilter() throws IOExcepti appendFiles.appendFile(dataFile); } appendFiles.commit(); - // identity: opTime=2021-01-01T01:00:00; bucket: id_bucket=0; - // truncate: ts_trunc=0; year: op_time_year=2021; month: op_time_month=2021-01; - // day: op_time_day=2021-01-01; hour: op_time_hour=2021-01-01-01 - DataFile sampleFile = dataFiles[0]; - Expression partitionFilter = - ExpressionUtil.convertPartitionDataToDataFilter( - getArcticTable(), sampleFile.specId(), Sets.newHashSet(sampleFile.partition())); - assertPlanHalfWithPartitionFilter(partitionFilter); + for (DataFile dataFile : dataFiles) { + Expression partitionFilter = + ExpressionUtil.convertPartitionDataToDataFilter( + getArcticTable(), dataFile.specId(), Sets.newHashSet(dataFile.partition())); + assertPlanHalfWithPartitionFilter(partitionFilter); + } } private void assertPlanHalfWithPartitionFilter(Expression partitionFilter) {