Skip to content

Commit

Permalink
IcebergConversions addType for Timestamp and Time
Browse files Browse the repository at this point in the history
  • Loading branch information
dbac committed Aug 1, 2024
1 parent eda5c40 commit 35e60b0
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.paimon.iceberg.manifest;

import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimestampType;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand All @@ -42,6 +45,9 @@ private IcebergConversions() {}
ThreadLocal.withInitial(StandardCharsets.UTF_8::newEncoder);

public static ByteBuffer toByteBuffer(DataType type, Object value) {
int precision;
Timestamp timestamp;
long timestampValue = 0;
switch (type.getTypeRoot()) {
case BOOLEAN:
return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00);
Expand Down Expand Up @@ -74,6 +80,33 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) {
case DECIMAL:
Decimal decimal = (Decimal) value;
return ByteBuffer.wrap((decimal.toUnscaledBytes()));
case TIMESTAMP_WITHOUT_TIME_ZONE:
final TimestampType timestampType = (TimestampType) type;
precision = timestampType.getPrecision();
timestamp = (Timestamp) value;
if (precision <= 6) {
timestampValue = timestamp.getMillisecond();
} else if (precision > 6) {
timestampValue = timestamp.getNanoOfMillisecond();
}
return ByteBuffer.allocate(8)
.order(ByteOrder.LITTLE_ENDIAN)
.putLong(timestampValue);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type;
precision = localTimestampType.getPrecision();
timestamp = (Timestamp) value;
if (precision <= 6) {
timestampValue = timestamp.getMillisecond();
} else if (precision > 6) {
timestampValue = timestamp.getNanoOfMillisecond();
}
return ByteBuffer.allocate(8)
.order(ByteOrder.LITTLE_ENDIAN)
.putLong(timestampValue);
case TIME_WITHOUT_TIME_ZONE:
Long time = ((Integer) value).longValue();
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, time);
default:
throw new UnsupportedOperationException("Cannot serialize type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ private static String toTypeString(DataType dataType) {
DecimalType decimalType = (DecimalType) dataType;
return String.format(
"decimal(%d, %d)", decimalType.getPrecision(), decimalType.getScale());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return "timestamptz";
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return "timestamp";
case TIME_WITHOUT_TIME_ZONE:
return "time";
default:
throw new UnsupportedOperationException("Unsupported data type: " + dataType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.iceberg;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.*;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.time.LocalTime;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/** Tests for Iceberg compatibility. */
public class IcebergDataTypeCompatibilityTest {

@TempDir java.nio.file.Path tempDir;

@Test
public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.TIMESTAMP(),
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.BIGINT()
},
new String[] {"pt1", "pt2", "k", "v1", "v2"});

BiFunction<Timestamp, Timestamp, BinaryRow> binaryRow =
(pt1, pt2) -> {
BinaryRow b = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(b);
writer.writeTimestamp(0, pt1, 6);
writer.writeTimestamp(1, pt2, 6);
writer.complete();
return b;
};

int numRecords = 1000;
ThreadLocalRandom random = ThreadLocalRandom.current();
List<TestRecord> testRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999));
Timestamp pt2 =
DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE);
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
long v2 = random.nextLong();
testRecords.add(
new TestRecord(
binaryRow.apply(pt1, pt2),
String.format("%s|%s|%s", pt1, pt2, k),
String.format("%d|%d", v1, v2),
GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2)));
}

runCompatibilityTest(
rowType,
Arrays.asList("pt1", "pt2"),
Arrays.asList("pt1", "pt2", "k"),
testRecords);
}

@Test
public void testPartitionedPrimaryKeyTable_Time() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.TIMESTAMP(),
DataTypes.TIME(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.BIGINT()
},
new String[] {"pt1", "pt2", "k", "v1", "v2"});

BiFunction<Timestamp, LocalTime, BinaryRow> binaryRow =
(pt1, pt2) -> {
BinaryRow b = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(b);
writer.writeTimestamp(0, pt1, 6);
writer.writeInt(1, pt2.getNano());
writer.complete();
return b;
};

int numRecords = 1000;
ThreadLocalRandom random = ThreadLocalRandom.current();
List<TestRecord> testRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999));
LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000));
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
long v2 = random.nextLong();
testRecords.add(
new TestRecord(
binaryRow.apply(pt1, pt2),
String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k),
String.format("%d|%d", v1, v2),
GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2)));
}

runCompatibilityTest(
rowType,
Arrays.asList("pt1", "pt2"),
Arrays.asList("pt1", "pt2", "k"),
testRecords);
}

private void runCompatibilityTest(
RowType rowType,
List<String> partitionKeys,
List<String> primaryKeys,
List<TestRecord> testRecords)
throws Exception {
LocalFileIO fileIO = LocalFileIO.create();
Path path = new Path(tempDir.toString());

Options options = new Options();
if (!primaryKeys.isEmpty()) {
options.set(CoreOptions.BUCKET, 2);
}
options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
options.set(CoreOptions.FILE_FORMAT, "avro");
Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), "");

FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path);
paimonCatalog.createDatabase("mydb", false);
Identifier paimonIdentifier = Identifier.create("mydb", "t");
paimonCatalog.createTable(paimonIdentifier, schema, false);
FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);

String commitUser = UUID.randomUUID().toString();
TableWriteImpl<?> write = table.newWrite(commitUser);
TableCommitImpl commit = table.newCommit(commitUser);

Map<String, String> expected = new HashMap<>();
for (TestRecord testRecord : testRecords) {
expected.put(testRecord.key, testRecord.value);
write.write(testRecord.record);
}

if (!primaryKeys.isEmpty()) {
for (BinaryRow partition :
testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) {
for (int b = 0; b < 2; b++) {
write.compact(partition, b, true);
}
}
}
commit.commit(1, write.prepareCommit(true, 1));
write.close();
commit.close();
}

private static class TestRecord {
private final BinaryRow partition;
private final String key;
private final String value;
private final GenericRow record;

private TestRecord(BinaryRow partition, String key, String value, GenericRow record) {
this.partition = partition;
this.key = key;
this.value = value;
this.record = record;
}
}
}

0 comments on commit 35e60b0

Please sign in to comment.