From 35e60b056246d7317d7d2f57f6f87a833badc7e2 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Thu, 1 Aug 2024 13:42:01 +0800 Subject: [PATCH 1/9] IcebergConversions addType for Timestamp and Time --- .../iceberg/manifest/IcebergConversions.java | 33 +++ .../iceberg/metadata/IcebergDataField.java | 6 + .../IcebergDataTypeCompatibilityTest.java | 206 ++++++++++++++++++ 3 files changed, 245 insertions(+) create mode 100644 paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index fb83dd52cec1..0a3a0932f4cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -19,7 +19,10 @@ package org.apache.paimon.iceberg.manifest; import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.TimestampType; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -42,6 +45,9 @@ private IcebergConversions() {} ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder); public static ByteBuffer toByteBuffer(DataType type, Object value) { + int precision; + Timestamp timestamp; + long timestampValue = 0; switch (type.getTypeRoot()) { case BOOLEAN: return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00); @@ -74,6 +80,33 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { case DECIMAL: Decimal decimal = (Decimal) value; return ByteBuffer.wrap((decimal.toUnscaledBytes())); + case TIMESTAMP_WITHOUT_TIME_ZONE: + final TimestampType timestampType = (TimestampType) type; + precision = timestampType.getPrecision(); + timestamp = (Timestamp) value; + if (precision <= 6) { + timestampValue = timestamp.getMillisecond(); + } else if (precision > 6) { + timestampValue = timestamp.getNanoOfMillisecond(); + } + return ByteBuffer.allocate(8) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(timestampValue); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; + precision = localTimestampType.getPrecision(); + timestamp = (Timestamp) value; + if (precision <= 6) { + timestampValue = timestamp.getMillisecond(); + } else if (precision > 6) { + timestampValue = timestamp.getNanoOfMillisecond(); + } + return ByteBuffer.allocate(8) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(timestampValue); + case TIME_WITHOUT_TIME_ZONE: + Long time = ((Integer) value).longValue(); + return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, time); default: throw new UnsupportedOperationException("Cannot serialize type: " + type); } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java index fd05183b6dc9..fd99b396aa53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java @@ -130,6 +130,12 @@ private static String toTypeString(DataType dataType) { DecimalType decimalType = (DecimalType) dataType; return String.format( "decimal(%d, %d)", decimalType.getPrecision(), decimalType.getScale()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return "timestamptz"; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return "timestamp"; + case TIME_WITHOUT_TIME_ZONE: + return "time"; default: throw new UnsupportedOperationException("Unsupported data type: " + dataType); } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java new file mode 100644 index 000000000000..d4db7ad0186e --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.*; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.time.LocalTime; +import java.util.*; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** Tests for Iceberg compatibility. */ +public class IcebergDataTypeCompatibilityTest { + + @TempDir java.nio.file.Path tempDir; + + @Test + public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt1, 6); + writer.writeTimestamp(1, pt2, 6); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + Timestamp pt2 = + DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%s|%s|%s", pt1, pt2, k), + String.format("%d|%d", v1, v2), + GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2))); + } + + runCompatibilityTest( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords); + } + + @Test + public void testPartitionedPrimaryKeyTable_Time() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(), + DataTypes.TIME(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt1, 6); + writer.writeInt(1, pt2.getNano()); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k), + String.format("%d|%d", v1, v2), + GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2))); + } + + runCompatibilityTest( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords); + } + + private void runCompatibilityTest( + RowType rowType, + List partitionKeys, + List primaryKeys, + List testRecords) + throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + Path path = new Path(tempDir.toString()); + + Options options = new Options(); + if (!primaryKeys.isEmpty()) { + options.set(CoreOptions.BUCKET, 2); + } + options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); + options.set(CoreOptions.FILE_FORMAT, "avro"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); + + FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); + paimonCatalog.createDatabase("mydb", false); + Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createTable(paimonIdentifier, schema, false); + FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + Map expected = new HashMap<>(); + for (TestRecord testRecord : testRecords) { + expected.put(testRecord.key, testRecord.value); + write.write(testRecord.record); + } + + if (!primaryKeys.isEmpty()) { + for (BinaryRow partition : + testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { + for (int b = 0; b < 2; b++) { + write.compact(partition, b, true); + } + } + } + commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + commit.close(); + } + + private static class TestRecord { + private final BinaryRow partition; + private final String key; + private final String value; + private final GenericRow record; + + private TestRecord(BinaryRow partition, String key, String value, GenericRow record) { + this.partition = partition; + this.key = key; + this.value = value; + this.record = record; + } + } +} From b557f69f788690bf3421b977496a941494b35f06 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Mon, 5 Aug 2024 22:32:03 +0800 Subject: [PATCH 2/9] 1.Extract common convert method 2.merge test --- .../iceberg/manifest/IcebergConversions.java | 34 ++- .../iceberg/IcebergCompatibilityTest.java | 150 ++++++++++++- .../IcebergDataTypeCompatibilityTest.java | 206 ------------------ 3 files changed, 160 insertions(+), 230 deletions(-) delete mode 100644 paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 0a3a0932f4cf..41d4dd376848 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -47,7 +47,6 @@ private IcebergConversions() {} public static ByteBuffer toByteBuffer(DataType type, Object value) { int precision; Timestamp timestamp; - long timestampValue = 0; switch (type.getTypeRoot()) { case BOOLEAN: return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00); @@ -81,29 +80,15 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { Decimal decimal = (Decimal) value; return ByteBuffer.wrap((decimal.toUnscaledBytes())); case TIMESTAMP_WITHOUT_TIME_ZONE: - final TimestampType timestampType = (TimestampType) type; + TimestampType timestampType = (TimestampType) type; precision = timestampType.getPrecision(); timestamp = (Timestamp) value; - if (precision <= 6) { - timestampValue = timestamp.getMillisecond(); - } else if (precision > 6) { - timestampValue = timestamp.getNanoOfMillisecond(); - } - return ByteBuffer.allocate(8) - .order(ByteOrder.LITTLE_ENDIAN) - .putLong(timestampValue); + return convertTimestampWithPrecisionToBuffer(timestamp, precision); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - final LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; + LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; precision = localTimestampType.getPrecision(); timestamp = (Timestamp) value; - if (precision <= 6) { - timestampValue = timestamp.getMillisecond(); - } else if (precision > 6) { - timestampValue = timestamp.getNanoOfMillisecond(); - } - return ByteBuffer.allocate(8) - .order(ByteOrder.LITTLE_ENDIAN) - .putLong(timestampValue); + return convertTimestampWithPrecisionToBuffer(timestamp, precision); case TIME_WITHOUT_TIME_ZONE: Long time = ((Integer) value).longValue(); return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, time); @@ -111,4 +96,15 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { throw new UnsupportedOperationException("Cannot serialize type: " + type); } } + + private static ByteBuffer convertTimestampWithPrecisionToBuffer( + Timestamp timestamp, int precision) { + long timestampValue; + if (precision <= 6) { + timestampValue = timestamp.getMillisecond(); + } else { + timestampValue = timestamp.getNanoOfMillisecond(); + } + return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(timestampValue); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 668d91d44bdf..af1ad141542d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -21,11 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryRowWriter; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.Decimal; -import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.*; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; @@ -36,6 +32,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.TableIdentifier; @@ -48,6 +45,7 @@ import java.nio.ByteBuffer; import java.time.LocalDate; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -271,6 +269,101 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { r -> ""); } + @Test + public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt1, 6); + writer.writeTimestamp(1, pt2, 6); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + Timestamp pt2 = + DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%s|%s|%s", pt1, pt2, k), + String.format("%d|%d", v1, v2), + GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2))); + } + + runCompatibilityTestForTimeAndTimeStamp( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords); + } + + @Test + public void testPartitionedPrimaryKeyTable_Time() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(), + DataTypes.TIME(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt1, 6); + writer.writeInt(1, pt2.getNano()); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k), + String.format("%d|%d", v1, v2), + GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2))); + } + + runCompatibilityTestForTimeAndTimeStamp( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords); + } + private void runCompatibilityTest( RowType rowType, List partitionKeys, @@ -332,6 +425,53 @@ private void runCompatibilityTest( assertThat(actual).isEqualTo(expected); } + private void runCompatibilityTestForTimeAndTimeStamp( + RowType rowType, + List partitionKeys, + List primaryKeys, + List testRecords) + throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + Path path = new Path(tempDir.toString()); + + Options options = new Options(); + if (!primaryKeys.isEmpty()) { + options.set(CoreOptions.BUCKET, 2); + } + options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); + options.set(CoreOptions.FILE_FORMAT, "avro"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); + + FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); + paimonCatalog.createDatabase("mydb", false); + Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createTable(paimonIdentifier, schema, false); + FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + Map expected = new HashMap<>(); + for (TestRecord testRecord : testRecords) { + expected.put(testRecord.key, testRecord.value); + write.write(testRecord.record); + } + + if (!primaryKeys.isEmpty()) { + for (BinaryRow partition : + testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { + for (int b = 0; b < 2; b++) { + write.compact(partition, b, true); + } + } + } + commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + commit.close(); + } + private static class TestRecord { private final BinaryRow partition; private final String key; diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java deleted file mode 100644 index d4db7ad0186e..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.iceberg; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.FileSystemCatalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.*; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.TableCommitImpl; -import org.apache.paimon.table.sink.TableWriteImpl; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.DateTimeUtils; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.time.LocalTime; -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.BiFunction; -import java.util.stream.Collectors; - -/** Tests for Iceberg compatibility. */ -public class IcebergDataTypeCompatibilityTest { - - @TempDir java.nio.file.Path tempDir; - - @Test - public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.TIMESTAMP(), - DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() - }, - new String[] {"pt1", "pt2", "k", "v1", "v2"}); - - BiFunction binaryRow = - (pt1, pt2) -> { - BinaryRow b = new BinaryRow(2); - BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeTimestamp(0, pt1, 6); - writer.writeTimestamp(1, pt2, 6); - writer.complete(); - return b; - }; - - int numRecords = 1000; - ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); - Timestamp pt2 = - DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%s|%s|%s", pt1, pt2, k), - String.format("%d|%d", v1, v2), - GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2))); - } - - runCompatibilityTest( - rowType, - Arrays.asList("pt1", "pt2"), - Arrays.asList("pt1", "pt2", "k"), - testRecords); - } - - @Test - public void testPartitionedPrimaryKeyTable_Time() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.TIMESTAMP(), - DataTypes.TIME(), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() - }, - new String[] {"pt1", "pt2", "k", "v1", "v2"}); - - BiFunction binaryRow = - (pt1, pt2) -> { - BinaryRow b = new BinaryRow(2); - BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeTimestamp(0, pt1, 6); - writer.writeInt(1, pt2.getNano()); - writer.complete(); - return b; - }; - - int numRecords = 1000; - ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); - LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000)); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k), - String.format("%d|%d", v1, v2), - GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2))); - } - - runCompatibilityTest( - rowType, - Arrays.asList("pt1", "pt2"), - Arrays.asList("pt1", "pt2", "k"), - testRecords); - } - - private void runCompatibilityTest( - RowType rowType, - List partitionKeys, - List primaryKeys, - List testRecords) - throws Exception { - LocalFileIO fileIO = LocalFileIO.create(); - Path path = new Path(tempDir.toString()); - - Options options = new Options(); - if (!primaryKeys.isEmpty()) { - options.set(CoreOptions.BUCKET, 2); - } - options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); - options.set(CoreOptions.FILE_FORMAT, "avro"); - Schema schema = - new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); - - FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); - paimonCatalog.createDatabase("mydb", false); - Identifier paimonIdentifier = Identifier.create("mydb", "t"); - paimonCatalog.createTable(paimonIdentifier, schema, false); - FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); - - String commitUser = UUID.randomUUID().toString(); - TableWriteImpl write = table.newWrite(commitUser); - TableCommitImpl commit = table.newCommit(commitUser); - - Map expected = new HashMap<>(); - for (TestRecord testRecord : testRecords) { - expected.put(testRecord.key, testRecord.value); - write.write(testRecord.record); - } - - if (!primaryKeys.isEmpty()) { - for (BinaryRow partition : - testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { - for (int b = 0; b < 2; b++) { - write.compact(partition, b, true); - } - } - } - commit.commit(1, write.prepareCommit(true, 1)); - write.close(); - commit.close(); - } - - private static class TestRecord { - private final BinaryRow partition; - private final String key; - private final String value; - private final GenericRow record; - - private TestRecord(BinaryRow partition, String key, String value, GenericRow record) { - this.partition = partition; - this.key = key; - this.value = value; - this.record = record; - } - } -} From b807d4ebb8e8b0174e8056de7af3693845ec3094 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Mon, 5 Aug 2024 22:50:15 +0800 Subject: [PATCH 3/9] fix code style --- .../apache/paimon/iceberg/IcebergCompatibilityTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index af1ad141542d..ba1af12fbac0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -21,7 +21,12 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.*; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; From d853fe93ad19f13c63c4e35fe9e27455425ca010 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Tue, 6 Aug 2024 11:24:14 +0800 Subject: [PATCH 4/9] remove local variables --- .../paimon/iceberg/manifest/IcebergConversions.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 41d4dd376848..1dde2a774fce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -45,8 +45,6 @@ private IcebergConversions() {} ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder); public static ByteBuffer toByteBuffer(DataType type, Object value) { - int precision; - Timestamp timestamp; switch (type.getTypeRoot()) { case BOOLEAN: return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00); @@ -81,14 +79,10 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { return ByteBuffer.wrap((decimal.toUnscaledBytes())); case TIMESTAMP_WITHOUT_TIME_ZONE: TimestampType timestampType = (TimestampType) type; - precision = timestampType.getPrecision(); - timestamp = (Timestamp) value; - return convertTimestampWithPrecisionToBuffer(timestamp, precision); + return convertTimestampWithPrecisionToBuffer((Timestamp) value, timestampType.getPrecision()); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; - precision = localTimestampType.getPrecision(); - timestamp = (Timestamp) value; - return convertTimestampWithPrecisionToBuffer(timestamp, precision); + return convertTimestampWithPrecisionToBuffer((Timestamp) value, localTimestampType.getPrecision()); case TIME_WITHOUT_TIME_ZONE: Long time = ((Integer) value).longValue(); return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, time); From 9ead1fffad466e33835185dffa6b62d7dd130949 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Tue, 6 Aug 2024 17:54:47 +0800 Subject: [PATCH 5/9] fix bug --- .../iceberg/manifest/IcebergConversions.java | 18 +- .../iceberg/IcebergCompatibilityTest.java | 191 +++++------------- 2 files changed, 59 insertions(+), 150 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 1dde2a774fce..4bfb44e26607 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -79,13 +79,17 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { return ByteBuffer.wrap((decimal.toUnscaledBytes())); case TIMESTAMP_WITHOUT_TIME_ZONE: TimestampType timestampType = (TimestampType) type; - return convertTimestampWithPrecisionToBuffer((Timestamp) value, timestampType.getPrecision()); + return convertTimestampWithPrecisionToBuffer( + (Timestamp) value, timestampType.getPrecision()); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; - return convertTimestampWithPrecisionToBuffer((Timestamp) value, localTimestampType.getPrecision()); + return convertTimestampWithPrecisionToBuffer( + (Timestamp) value, localTimestampType.getPrecision()); case TIME_WITHOUT_TIME_ZONE: - Long time = ((Integer) value).longValue(); - return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, time); + long microsecondsFromMidnight = (Integer) value / 1_000; + return ByteBuffer.allocate(8) + .order(ByteOrder.LITTLE_ENDIAN) + .putLong(0, microsecondsFromMidnight); default: throw new UnsupportedOperationException("Cannot serialize type: " + type); } @@ -94,10 +98,12 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { private static ByteBuffer convertTimestampWithPrecisionToBuffer( Timestamp timestamp, int precision) { long timestampValue; + long secondsSinceEpoch = timestamp.toInstant().getEpochSecond(); if (precision <= 6) { - timestampValue = timestamp.getMillisecond(); + timestampValue = + secondsSinceEpoch * 1_000_000 + timestamp.getNanoOfMillisecond() / 1_000; } else { - timestampValue = timestamp.getNanoOfMillisecond(); + timestampValue = secondsSinceEpoch * 1_000_000_000 + timestamp.getNanoOfMillisecond(); } return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(timestampValue); } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index ba1af12fbac0..febfb4271754 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -37,7 +37,6 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.DateTimeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.TableIdentifier; @@ -50,7 +49,7 @@ import java.nio.ByteBuffer; import java.time.LocalDate; -import java.time.LocalTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -163,6 +162,52 @@ public void testPartitionedPrimaryKeyTable() throws Exception { r -> String.format("%d|%d", r.get(3, Integer.class), r.get(4, Long.class))); } + @Test + public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(6), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt", "k", "v1", "v2"}); + + Function binaryRow = + (pt) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt, 6); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt), + String.format("%s|%s", pt.toInstant().atOffset(ZoneOffset.UTC), k), + String.format("%d|%d", v1, v2), + GenericRow.of(pt, BinaryString.fromString(k), v1, v2))); + } + + runCompatibilityTest( + rowType, + Arrays.asList("pt"), + Arrays.asList("pt", "k"), + testRecords, + r -> String.format("%s|%s", r.get(0), r.get(1)), + r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3, Long.class))); + } + @Test public void testAppendOnlyTableWithAllTypes() throws Exception { RowType rowType = @@ -274,101 +319,6 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { r -> ""); } - @Test - public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.TIMESTAMP(), - DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() - }, - new String[] {"pt1", "pt2", "k", "v1", "v2"}); - - BiFunction binaryRow = - (pt1, pt2) -> { - BinaryRow b = new BinaryRow(2); - BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeTimestamp(0, pt1, 6); - writer.writeTimestamp(1, pt2, 6); - writer.complete(); - return b; - }; - - int numRecords = 1000; - ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); - Timestamp pt2 = - DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%s|%s|%s", pt1, pt2, k), - String.format("%d|%d", v1, v2), - GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2))); - } - - runCompatibilityTestForTimeAndTimeStamp( - rowType, - Arrays.asList("pt1", "pt2"), - Arrays.asList("pt1", "pt2", "k"), - testRecords); - } - - @Test - public void testPartitionedPrimaryKeyTable_Time() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.TIMESTAMP(), - DataTypes.TIME(), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() - }, - new String[] {"pt1", "pt2", "k", "v1", "v2"}); - - BiFunction binaryRow = - (pt1, pt2) -> { - BinaryRow b = new BinaryRow(2); - BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeTimestamp(0, pt1, 6); - writer.writeInt(1, pt2.getNano()); - writer.complete(); - return b; - }; - - int numRecords = 1000; - ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); - LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000)); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k), - String.format("%d|%d", v1, v2), - GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2))); - } - - runCompatibilityTestForTimeAndTimeStamp( - rowType, - Arrays.asList("pt1", "pt2"), - Arrays.asList("pt1", "pt2", "k"), - testRecords); - } - private void runCompatibilityTest( RowType rowType, List partitionKeys, @@ -430,53 +380,6 @@ private void runCompatibilityTest( assertThat(actual).isEqualTo(expected); } - private void runCompatibilityTestForTimeAndTimeStamp( - RowType rowType, - List partitionKeys, - List primaryKeys, - List testRecords) - throws Exception { - LocalFileIO fileIO = LocalFileIO.create(); - Path path = new Path(tempDir.toString()); - - Options options = new Options(); - if (!primaryKeys.isEmpty()) { - options.set(CoreOptions.BUCKET, 2); - } - options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); - options.set(CoreOptions.FILE_FORMAT, "avro"); - Schema schema = - new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); - - FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); - paimonCatalog.createDatabase("mydb", false); - Identifier paimonIdentifier = Identifier.create("mydb", "t"); - paimonCatalog.createTable(paimonIdentifier, schema, false); - FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); - - String commitUser = UUID.randomUUID().toString(); - TableWriteImpl write = table.newWrite(commitUser); - TableCommitImpl commit = table.newCommit(commitUser); - - Map expected = new HashMap<>(); - for (TestRecord testRecord : testRecords) { - expected.put(testRecord.key, testRecord.value); - write.write(testRecord.record); - } - - if (!primaryKeys.isEmpty()) { - for (BinaryRow partition : - testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { - for (int b = 0; b < 2; b++) { - write.compact(partition, b, true); - } - } - } - commit.commit(1, write.prepareCommit(true, 1)); - write.close(); - commit.close(); - } - private static class TestRecord { private final BinaryRow partition; private final String key; From 09dfb486e29792964ad1f4686f2fdfe078679461 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Mon, 12 Aug 2024 23:12:52 +0800 Subject: [PATCH 6/9] fix Precision --- .../iceberg/manifest/IcebergConversions.java | 11 ++++----- .../iceberg/IcebergCompatibilityTest.java | 23 +++++++++++++++++-- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 4bfb44e26607..30dfc96cfd5c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -86,7 +86,7 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { return convertTimestampWithPrecisionToBuffer( (Timestamp) value, localTimestampType.getPrecision()); case TIME_WITHOUT_TIME_ZONE: - long microsecondsFromMidnight = (Integer) value / 1_000; + long microsecondsFromMidnight = (Long) value / 1_000; return ByteBuffer.allocate(8) .order(ByteOrder.LITTLE_ENDIAN) .putLong(0, microsecondsFromMidnight); @@ -98,12 +98,11 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { private static ByteBuffer convertTimestampWithPrecisionToBuffer( Timestamp timestamp, int precision) { long timestampValue; - long secondsSinceEpoch = timestamp.toInstant().getEpochSecond(); - if (precision <= 6) { - timestampValue = - secondsSinceEpoch * 1_000_000 + timestamp.getNanoOfMillisecond() / 1_000; + if (precision <= 3) { + timestampValue = timestamp.getMillisecond() * 1_000_000; } else { - timestampValue = secondsSinceEpoch * 1_000_000_000 + timestamp.getNanoOfMillisecond(); + timestampValue = + timestamp.getMillisecond() * 1_000_000 + timestamp.getNanoOfMillisecond(); } return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(timestampValue); } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index febfb4271754..ff7f3b2cae36 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -174,6 +174,7 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { }, new String[] {"pt", "k", "v1", "v2"}); + List testRecords = new ArrayList<>(); Function binaryRow = (pt) -> { BinaryRow b = new BinaryRow(2); @@ -184,10 +185,11 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { }; int numRecords = 1000; + int[] precisions = {3, 6}; ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { - Timestamp pt = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + Timestamp pt = + generateRandomTimestamp(random, precisions[random.nextInt(precisions.length)]); String k = String.valueOf(random.nextInt(0, 100)); int v1 = random.nextInt(); long v2 = random.nextLong(); @@ -208,6 +210,23 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3, Long.class))); } + private Timestamp generateRandomTimestamp(ThreadLocalRandom random, int precision) { + long milliseconds = random.nextLong(0, 10_000_000_000_000L); + int nanoAdjustment; + switch (precision) { + case 3: + nanoAdjustment = 0; + break; + case 6: + nanoAdjustment = random.nextInt(0, 1_000) * 1000; + break; + default: + throw new IllegalArgumentException("Unsupported precision: " + precision); + } + + return Timestamp.fromEpochMillis(milliseconds, nanoAdjustment); + } + @Test public void testAppendOnlyTableWithAllTypes() throws Exception { RowType rowType = From 08929e8226c935d46109aa7347b35c914024d554 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Tue, 27 Aug 2024 20:32:04 +0800 Subject: [PATCH 7/9] resolve conflicts --- .../iceberg/IcebergCompatibilityTest.java | 74 ++++++++++++------- 1 file changed, 46 insertions(+), 28 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index dbd7c77ab078..6ccba7d296a4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -37,6 +37,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.DataType; @@ -319,8 +320,8 @@ public void testPartitionedPrimaryKeyTable() throws Exception { return b; }; - int numRounds = 20; - int numRecords = 500; + int numRounds = 2; + int numRecords = 1; ThreadLocalRandom random = ThreadLocalRandom.current(); boolean samePartitionEachRound = random.nextBoolean(); @@ -368,38 +369,54 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.TIMESTAMP(6), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() + DataTypes.TIMESTAMP(6), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() }, new String[] {"pt", "k", "v1", "v2"}); - List testRecords = new ArrayList<>(); - Function binaryRow = - (pt) -> { + BiFunction binaryRow = + (pt, k) -> { BinaryRow b = new BinaryRow(2); BinaryRowWriter writer = new BinaryRowWriter(b); writer.writeTimestamp(0, pt, 6); + writer.writeString(1, BinaryString.fromString(k)); writer.complete(); return b; }; - int numRecords = 1000; - int[] precisions = {3, 6}; + int numRounds = 2; + int numRecords = 1; ThreadLocalRandom random = ThreadLocalRandom.current(); - for (int i = 0; i < numRecords; i++) { - Timestamp pt = - generateRandomTimestamp(random, precisions[random.nextInt(precisions.length)]); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt), - String.format("%s|%s", pt.toInstant().atOffset(ZoneOffset.UTC), k), - String.format("%d|%d", v1, v2), - GenericRow.of(pt, BinaryString.fromString(k), v1, v2))); + + List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + Map expectedMap = new LinkedHashMap<>(); + + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt = generateRandomTimestamp(random, random.nextBoolean() ? 3 : 6); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + + round.add( + new TestRecord( + binaryRow.apply(pt, k), + GenericRow.of(pt, BinaryString.fromString(k), v1, v2))); + + expectedMap.put( + String.format("%s, %s", pt.toInstant().atOffset(ZoneOffset.UTC), k), + String.format("%d, %d", v1, v2)); + } + + testRecords.add(round); + expected.add( + expectedMap.entrySet().stream() + .map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue())) + .collect(Collectors.toList())); } runCompatibilityTest( @@ -407,8 +424,8 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { Arrays.asList("pt"), Arrays.asList("pt", "k"), testRecords, - r -> String.format("%s|%s", r.get(0), r.get(1)), - r -> String.format("%d|%d", r.get(2, Integer.class), r.get(3, Long.class))); + expected, + Record::toString); } private Timestamp generateRandomTimestamp(ThreadLocalRandom random, int precision) { @@ -563,7 +580,6 @@ private void runCompatibilityTest( String commitUser = UUID.randomUUID().toString(); TableWriteImpl write = table.newWrite(commitUser); TableCommitImpl commit = table.newCommit(commitUser); - for (int r = 0; r < testRecords.size(); r++) { List round = testRecords.get(r); for (TestRecord testRecord : round) { @@ -578,9 +594,11 @@ private void runCompatibilityTest( } } } - commit.commit(r, write.prepareCommit(true, r)); - assertThat(getIcebergResult(icebergRecordToString)).hasSameElementsAs(expected.get(r)); + commit.commit(r, write.prepareCommit(true, r)); + Thread.sleep(500); + System.out.println("结果:"+getIcebergResult(icebergRecordToString)); + // assertThat(getIcebergResult(icebergRecordToString)).hasSameElementsAs(expected.get(r)); } write.close(); From 161b95ca601deb8361917033d986defaa0fa378b Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Tue, 27 Aug 2024 20:32:14 +0800 Subject: [PATCH 8/9] resolve conflicts --- .../iceberg/manifest/IcebergConversions.java | 19 +++++++ .../iceberg/IcebergCompatibilityTest.java | 54 ++++++++++--------- 2 files changed, 48 insertions(+), 25 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 6920dad679c6..02cfe88af4bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -128,6 +128,14 @@ public static Object toPaimonObject(DataType type, byte[] bytes) { DecimalType decimalType = (DecimalType) type; return Decimal.fromUnscaledBytes( bytes, decimalType.getPrecision(), decimalType.getScale()); + case TIMESTAMP_WITHOUT_TIME_ZONE: + TimestampType timestampType = (TimestampType) type; + return convertBytesToTimestamp(bytes, timestampType.getPrecision()); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; + return convertBytesToTimestamp(bytes, localTimestampType.getPrecision()); + case TIME_WITHOUT_TIME_ZONE: + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong() * 1000; default: throw new UnsupportedOperationException("Cannot deserialize type: " + type); } @@ -144,4 +152,15 @@ private static ByteBuffer convertTimestampWithPrecisionToBuffer( } return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(timestampValue); } + + private static Timestamp convertBytesToTimestamp(byte[] bytes, int precision) { + long timestampValue = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong(); + long milliseconds = timestampValue / 1_000_000; + int nanosOfMillisecond = (int) (timestampValue % 1_000_000); + if (precision <= 3) { + return Timestamp.fromEpochMillis(milliseconds); + } else { + return Timestamp.fromEpochMillis(milliseconds, nanosOfMillisecond); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 6ccba7d296a4..02c7a38fa993 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -26,8 +26,8 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; -import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.MemorySize; @@ -37,7 +37,6 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.SinkRecord; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.DataType; @@ -320,8 +319,8 @@ public void testPartitionedPrimaryKeyTable() throws Exception { return b; }; - int numRounds = 2; - int numRecords = 1; + int numRounds = 20; + int numRecords = 500; ThreadLocalRandom random = ThreadLocalRandom.current(); boolean samePartitionEachRound = random.nextBoolean(); @@ -369,25 +368,26 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.TIMESTAMP(6), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() + DataTypes.TIMESTAMP(6), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() }, - new String[] {"pt", "k", "v1", "v2"}); + new String[] {"pt1", "pt2", "k", "v1", "v2"}); BiFunction binaryRow = - (pt, k) -> { + (pt1, pt2) -> { BinaryRow b = new BinaryRow(2); BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeTimestamp(0, pt, 6); - writer.writeString(1, BinaryString.fromString(k)); + writer.writeTimestamp(0, pt1, 6); + writer.writeString(1, BinaryString.fromString(pt2)); writer.complete(); return b; }; - int numRounds = 2; - int numRecords = 1; + int numRounds = 20; + int numRecords = 100; ThreadLocalRandom random = ThreadLocalRandom.current(); List> testRecords = new ArrayList<>(); @@ -397,18 +397,25 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { for (int r = 0; r < numRounds; r++) { List round = new ArrayList<>(); for (int i = 0; i < numRecords; i++) { - Timestamp pt = generateRandomTimestamp(random, random.nextBoolean() ? 3 : 6); + Timestamp pt1 = generateRandomTimestamp(random, random.nextBoolean() ? 3 : 6); + String pt2 = String.valueOf(random.nextInt(10, 12)); String k = String.valueOf(random.nextInt(0, 100)); int v1 = random.nextInt(); long v2 = random.nextLong(); round.add( new TestRecord( - binaryRow.apply(pt, k), - GenericRow.of(pt, BinaryString.fromString(k), v1, v2))); + binaryRow.apply(pt1, pt2), + GenericRow.of( + pt1, + BinaryString.fromString(pt2), + BinaryString.fromString(k), + v1, + v2))); expectedMap.put( - String.format("%s, %s", pt.toInstant().atOffset(ZoneOffset.UTC), k), + String.format( + "%s, %s, %s", pt1.toInstant().atOffset(ZoneOffset.UTC), pt2, k), String.format("%d, %d", v1, v2)); } @@ -421,15 +428,15 @@ public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { runCompatibilityTest( rowType, - Arrays.asList("pt"), - Arrays.asList("pt", "k"), + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), testRecords, expected, Record::toString); } private Timestamp generateRandomTimestamp(ThreadLocalRandom random, int precision) { - long milliseconds = random.nextLong(0, 10_000_000_000_000L); + long milliseconds = random.nextLong(0, Long.MAX_VALUE / 1000_000 - 1_000 * 1000); int nanoAdjustment; switch (precision) { case 3: @@ -594,11 +601,8 @@ private void runCompatibilityTest( } } } - commit.commit(r, write.prepareCommit(true, r)); - Thread.sleep(500); - System.out.println("结果:"+getIcebergResult(icebergRecordToString)); - // assertThat(getIcebergResult(icebergRecordToString)).hasSameElementsAs(expected.get(r)); + assertThat(getIcebergResult(icebergRecordToString)).hasSameElementsAs(expected.get(r)); } write.close(); From 8725fbcdb5582b5704fa804c04019c0a1973fc06 Mon Sep 17 00:00:00 2001 From: lihongyi <642826683@qq.com> Date: Fri, 30 Aug 2024 20:33:14 +0800 Subject: [PATCH 9/9] resolve conflicts --- .../iceberg/manifest/IcebergConversions.java | 6 +- .../iceberg/metadata/IcebergDataField.java | 4 +- .../iceberg/IcebergCompatibilityTest.java | 125 ++++++++++++++++-- 3 files changed, 116 insertions(+), 19 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 02cfe88af4bc..9e8a96f0125f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -91,10 +91,10 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { return convertTimestampWithPrecisionToBuffer( (Timestamp) value, localTimestampType.getPrecision()); case TIME_WITHOUT_TIME_ZONE: - long microsecondsFromMidnight = (Long) value / 1_000; + long microsecondsFromMillis = (int) value * 1_000; return ByteBuffer.allocate(8) .order(ByteOrder.LITTLE_ENDIAN) - .putLong(0, microsecondsFromMidnight); + .putLong(0, microsecondsFromMillis); default: throw new UnsupportedOperationException("Cannot serialize type: " + type); } @@ -135,7 +135,7 @@ public static Object toPaimonObject(DataType type, byte[] bytes) { LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; return convertBytesToTimestamp(bytes, localTimestampType.getPrecision()); case TIME_WITHOUT_TIME_ZONE: - return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong() * 1000; + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong() / 1000; default: throw new UnsupportedOperationException("Cannot deserialize type: " + type); } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java index fd99b396aa53..6de660823723 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java @@ -131,9 +131,9 @@ private static String toTypeString(DataType dataType) { return String.format( "decimal(%d, %d)", decimalType.getPrecision(), decimalType.getScale()); case TIMESTAMP_WITHOUT_TIME_ZONE: - return "timestamptz"; - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return "timestamp"; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return "timestamptz"; case TIME_WITHOUT_TIME_ZONE: return "time"; default: diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 141def4e224a..0eeb5431090d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -372,7 +373,8 @@ public void testAllTypeStatistics() throws Exception { DataTypes.STRING(), DataTypes.BINARY(20), DataTypes.VARBINARY(20), - DataTypes.DATE() + DataTypes.DATE(), + DataTypes.TIMESTAMP(6) }, new String[] { "v_int", @@ -385,7 +387,8 @@ public void testAllTypeStatistics() throws Exception { "v_varchar", "v_binary", "v_varbinary", - "v_date" + "v_date", + "v_timestamp" }); FileStoreTable table = createPaimonTable(rowType, Collections.emptyList(), Collections.emptyList(), -1); @@ -406,7 +409,8 @@ public void testAllTypeStatistics() throws Exception { BinaryString.fromString("cat"), "B_apple".getBytes(), "B_cat".getBytes(), - 100); + 100, + Timestamp.fromEpochMillis(1060328130123L, 256000)); write.write(lowerBounds); GenericRow upperBounds = GenericRow.of( @@ -420,7 +424,8 @@ public void testAllTypeStatistics() throws Exception { BinaryString.fromString("dog"), "B_banana".getBytes(), "B_dog".getBytes(), - 200); + 200, + Timestamp.fromEpochMillis(1723486530123L, 456000)); write.write(upperBounds); commit.commit(1, write.prepareCommit(false, 1)); @@ -448,6 +453,9 @@ public void testAllTypeStatistics() throws Exception { } else if (type.getTypeRoot() == DataTypeRoot.DECIMAL) { lower = new BigDecimal(lowerBounds.getField(i).toString()); upper = new BigDecimal(upperBounds.getField(i).toString()); + } else if (type.getTypeRoot() == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + lower = ((Timestamp) lowerBounds.getField(i)).toString(); + upper = ((Timestamp) upperBounds.getField(i)).toString(); } else { lower = lowerBounds.getField(i); upper = upperBounds.getField(i); @@ -459,16 +467,18 @@ public void testAllTypeStatistics() throws Exception { expectedLower = LocalDate.ofEpochDay((int) lower).toString(); expectedUpper = LocalDate.ofEpochDay((int) upper).toString(); } - - assertThat( - getIcebergResult( - icebergTable -> - IcebergGenerics.read(icebergTable) - .select(name) - .where(Expressions.lessThan(name, upper)) - .build(), - Record::toString)) - .containsExactly("Record(" + expectedLower + ")"); + if (type.getTypeRoot() != DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) { + // todo iceberg lessthan has bug + assertThat( + getIcebergResult( + icebergTable -> + IcebergGenerics.read(icebergTable) + .select(name) + .where(Expressions.lessThan(name, upper)) + .build(), + Record::toString)) + .containsExactly("Record(" + expectedLower + ")"); + } assertThat( getIcebergResult( icebergTable -> @@ -614,6 +624,76 @@ public void testPartitionedPrimaryKeyTable() throws Exception { Record::toString); } + @Test + public void testPartitionedPrimaryKeyTableTimestamp() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(6), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt1, 6); + writer.writeString(1, BinaryString.fromString(pt2)); + writer.complete(); + return b; + }; + + int numRounds = 20; + int numRecords = 100; + ThreadLocalRandom random = ThreadLocalRandom.current(); + + List> testRecords = new ArrayList<>(); + List> expected = new ArrayList<>(); + Map expectedMap = new LinkedHashMap<>(); + + for (int r = 0; r < numRounds; r++) { + List round = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt1 = generateRandomTimestamp(random, random.nextBoolean() ? 3 : 6); + String pt2 = String.valueOf(random.nextInt(10, 12)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + + round.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + GenericRow.of( + pt1, + BinaryString.fromString(pt2), + BinaryString.fromString(k), + v1, + v2))); + + expectedMap.put( + String.format("%s, %s, %s", pt1, pt2, k), String.format("%d, %d", v1, v2)); + } + + testRecords.add(round); + expected.add( + expectedMap.entrySet().stream() + .map(e -> String.format("Record(%s, %s)", e.getKey(), e.getValue())) + .collect(Collectors.toList())); + } + + runCompatibilityTest( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords, + expected, + Record::toString); + } + private void runCompatibilityTest( RowType rowType, List partitionKeys, @@ -726,4 +806,21 @@ private List getIcebergResult( result.close(); return actual; } + + private Timestamp generateRandomTimestamp(ThreadLocalRandom random, int precision) { + long milliseconds = random.nextLong(0, Long.MAX_VALUE / 1000_000 - 1_000 * 1000); + int nanoAdjustment; + switch (precision) { + case 3: + nanoAdjustment = 0; + break; + case 6: + nanoAdjustment = random.nextInt(0, 1_000) * 1000; + break; + default: + throw new IllegalArgumentException("Unsupported precision: " + precision); + } + + return Timestamp.fromEpochMillis(milliseconds, nanoAdjustment); + } }