diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java index 0a3a0932f4cf..41d4dd376848 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergConversions.java @@ -47,7 +47,6 @@ private IcebergConversions() {} public static ByteBuffer toByteBuffer(DataType type, Object value) { int precision; Timestamp timestamp; - long timestampValue = 0; switch (type.getTypeRoot()) { case BOOLEAN: return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00); @@ -81,29 +80,15 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { Decimal decimal = (Decimal) value; return ByteBuffer.wrap((decimal.toUnscaledBytes())); case TIMESTAMP_WITHOUT_TIME_ZONE: - final TimestampType timestampType = (TimestampType) type; + TimestampType timestampType = (TimestampType) type; precision = timestampType.getPrecision(); timestamp = (Timestamp) value; - if (precision <= 6) { - timestampValue = timestamp.getMillisecond(); - } else if (precision > 6) { - timestampValue = timestamp.getNanoOfMillisecond(); - } - return ByteBuffer.allocate(8) - .order(ByteOrder.LITTLE_ENDIAN) - .putLong(timestampValue); + return convertTimestampWithPrecisionToBuffer(timestamp, precision); case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - final LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; + LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type; precision = localTimestampType.getPrecision(); timestamp = (Timestamp) value; - if (precision <= 6) { - timestampValue = timestamp.getMillisecond(); - } else if (precision > 6) { - timestampValue = timestamp.getNanoOfMillisecond(); - } - return ByteBuffer.allocate(8) - .order(ByteOrder.LITTLE_ENDIAN) - .putLong(timestampValue); + return convertTimestampWithPrecisionToBuffer(timestamp, precision); case TIME_WITHOUT_TIME_ZONE: Long time = ((Integer) value).longValue(); return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, time); @@ -111,4 +96,15 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) { throw new UnsupportedOperationException("Cannot serialize type: " + type); } } + + private static ByteBuffer convertTimestampWithPrecisionToBuffer( + Timestamp timestamp, int precision) { + long timestampValue; + if (precision <= 6) { + timestampValue = timestamp.getMillisecond(); + } else { + timestampValue = timestamp.getNanoOfMillisecond(); + } + return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(timestampValue); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 668d91d44bdf..af1ad141542d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -21,11 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.data.BinaryRowWriter; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.Decimal; -import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.*; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; @@ -36,6 +32,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.catalog.TableIdentifier; @@ -48,6 +45,7 @@ import java.nio.ByteBuffer; import java.time.LocalDate; +import java.time.LocalTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -271,6 +269,101 @@ public void testAppendOnlyTableWithAllTypes() throws Exception { r -> ""); } + @Test + public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(), + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt1, 6); + writer.writeTimestamp(1, pt2, 6); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + Timestamp pt2 = + DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%s|%s|%s", pt1, pt2, k), + String.format("%d|%d", v1, v2), + GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2))); + } + + runCompatibilityTestForTimeAndTimeStamp( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords); + } + + @Test + public void testPartitionedPrimaryKeyTable_Time() throws Exception { + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.TIMESTAMP(), + DataTypes.TIME(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.BIGINT() + }, + new String[] {"pt1", "pt2", "k", "v1", "v2"}); + + BiFunction binaryRow = + (pt1, pt2) -> { + BinaryRow b = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeTimestamp(0, pt1, 6); + writer.writeInt(1, pt2.getNano()); + writer.complete(); + return b; + }; + + int numRecords = 1000; + ThreadLocalRandom random = ThreadLocalRandom.current(); + List testRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); + LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000)); + String k = String.valueOf(random.nextInt(0, 100)); + int v1 = random.nextInt(); + long v2 = random.nextLong(); + testRecords.add( + new TestRecord( + binaryRow.apply(pt1, pt2), + String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k), + String.format("%d|%d", v1, v2), + GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2))); + } + + runCompatibilityTestForTimeAndTimeStamp( + rowType, + Arrays.asList("pt1", "pt2"), + Arrays.asList("pt1", "pt2", "k"), + testRecords); + } + private void runCompatibilityTest( RowType rowType, List partitionKeys, @@ -332,6 +425,53 @@ private void runCompatibilityTest( assertThat(actual).isEqualTo(expected); } + private void runCompatibilityTestForTimeAndTimeStamp( + RowType rowType, + List partitionKeys, + List primaryKeys, + List testRecords) + throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + Path path = new Path(tempDir.toString()); + + Options options = new Options(); + if (!primaryKeys.isEmpty()) { + options.set(CoreOptions.BUCKET, 2); + } + options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); + options.set(CoreOptions.FILE_FORMAT, "avro"); + Schema schema = + new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); + + FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); + paimonCatalog.createDatabase("mydb", false); + Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createTable(paimonIdentifier, schema, false); + FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + Map expected = new HashMap<>(); + for (TestRecord testRecord : testRecords) { + expected.put(testRecord.key, testRecord.value); + write.write(testRecord.record); + } + + if (!primaryKeys.isEmpty()) { + for (BinaryRow partition : + testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { + for (int b = 0; b < 2; b++) { + write.compact(partition, b, true); + } + } + } + commit.commit(1, write.prepareCommit(true, 1)); + write.close(); + commit.close(); + } + private static class TestRecord { private final BinaryRow partition; private final String key; diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java deleted file mode 100644 index d4db7ad0186e..000000000000 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergDataTypeCompatibilityTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.iceberg; - -import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.FileSystemCatalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.data.*; -import org.apache.paimon.fs.Path; -import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.Schema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.sink.TableCommitImpl; -import org.apache.paimon.table.sink.TableWriteImpl; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.DateTimeUtils; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.time.LocalTime; -import java.util.*; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.BiFunction; -import java.util.stream.Collectors; - -/** Tests for Iceberg compatibility. */ -public class IcebergDataTypeCompatibilityTest { - - @TempDir java.nio.file.Path tempDir; - - @Test - public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.TIMESTAMP(), - DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() - }, - new String[] {"pt1", "pt2", "k", "v1", "v2"}); - - BiFunction binaryRow = - (pt1, pt2) -> { - BinaryRow b = new BinaryRow(2); - BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeTimestamp(0, pt1, 6); - writer.writeTimestamp(1, pt2, 6); - writer.complete(); - return b; - }; - - int numRecords = 1000; - ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); - Timestamp pt2 = - DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%s|%s|%s", pt1, pt2, k), - String.format("%d|%d", v1, v2), - GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2))); - } - - runCompatibilityTest( - rowType, - Arrays.asList("pt1", "pt2"), - Arrays.asList("pt1", "pt2", "k"), - testRecords); - } - - @Test - public void testPartitionedPrimaryKeyTable_Time() throws Exception { - RowType rowType = - RowType.of( - new DataType[] { - DataTypes.TIMESTAMP(), - DataTypes.TIME(), - DataTypes.STRING(), - DataTypes.INT(), - DataTypes.BIGINT() - }, - new String[] {"pt1", "pt2", "k", "v1", "v2"}); - - BiFunction binaryRow = - (pt1, pt2) -> { - BinaryRow b = new BinaryRow(2); - BinaryRowWriter writer = new BinaryRowWriter(b); - writer.writeTimestamp(0, pt1, 6); - writer.writeInt(1, pt2.getNano()); - writer.complete(); - return b; - }; - - int numRecords = 1000; - ThreadLocalRandom random = ThreadLocalRandom.current(); - List testRecords = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999)); - LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000)); - String k = String.valueOf(random.nextInt(0, 100)); - int v1 = random.nextInt(); - long v2 = random.nextLong(); - testRecords.add( - new TestRecord( - binaryRow.apply(pt1, pt2), - String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k), - String.format("%d|%d", v1, v2), - GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2))); - } - - runCompatibilityTest( - rowType, - Arrays.asList("pt1", "pt2"), - Arrays.asList("pt1", "pt2", "k"), - testRecords); - } - - private void runCompatibilityTest( - RowType rowType, - List partitionKeys, - List primaryKeys, - List testRecords) - throws Exception { - LocalFileIO fileIO = LocalFileIO.create(); - Path path = new Path(tempDir.toString()); - - Options options = new Options(); - if (!primaryKeys.isEmpty()) { - options.set(CoreOptions.BUCKET, 2); - } - options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); - options.set(CoreOptions.FILE_FORMAT, "avro"); - Schema schema = - new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); - - FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path); - paimonCatalog.createDatabase("mydb", false); - Identifier paimonIdentifier = Identifier.create("mydb", "t"); - paimonCatalog.createTable(paimonIdentifier, schema, false); - FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); - - String commitUser = UUID.randomUUID().toString(); - TableWriteImpl write = table.newWrite(commitUser); - TableCommitImpl commit = table.newCommit(commitUser); - - Map expected = new HashMap<>(); - for (TestRecord testRecord : testRecords) { - expected.put(testRecord.key, testRecord.value); - write.write(testRecord.record); - } - - if (!primaryKeys.isEmpty()) { - for (BinaryRow partition : - testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) { - for (int b = 0; b < 2; b++) { - write.compact(partition, b, true); - } - } - } - commit.commit(1, write.prepareCommit(true, 1)); - write.close(); - commit.close(); - } - - private static class TestRecord { - private final BinaryRow partition; - private final String key; - private final String value; - private final GenericRow record; - - private TestRecord(BinaryRow partition, String key, String value, GenericRow record) { - this.partition = partition; - this.key = key; - this.value = value; - this.record = record; - } - } -}