Skip to content

Commit

Permalink
1.Extract common convert method 2.merge test
Browse files Browse the repository at this point in the history
  • Loading branch information
dbac committed Aug 5, 2024
1 parent 35e60b0 commit b557f69
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ private IcebergConversions() {}
public static ByteBuffer toByteBuffer(DataType type, Object value) {
int precision;
Timestamp timestamp;
long timestampValue = 0;
switch (type.getTypeRoot()) {
case BOOLEAN:
return ByteBuffer.allocate(1).put(0, (Boolean) value ? (byte) 0x01 : (byte) 0x00);
Expand Down Expand Up @@ -81,34 +80,31 @@ public static ByteBuffer toByteBuffer(DataType type, Object value) {
Decimal decimal = (Decimal) value;
return ByteBuffer.wrap((decimal.toUnscaledBytes()));
case TIMESTAMP_WITHOUT_TIME_ZONE:
final TimestampType timestampType = (TimestampType) type;
TimestampType timestampType = (TimestampType) type;
precision = timestampType.getPrecision();
timestamp = (Timestamp) value;
if (precision <= 6) {
timestampValue = timestamp.getMillisecond();
} else if (precision > 6) {
timestampValue = timestamp.getNanoOfMillisecond();
}
return ByteBuffer.allocate(8)
.order(ByteOrder.LITTLE_ENDIAN)
.putLong(timestampValue);
return convertTimestampWithPrecisionToBuffer(timestamp, precision);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type;
LocalZonedTimestampType localTimestampType = (LocalZonedTimestampType) type;
precision = localTimestampType.getPrecision();
timestamp = (Timestamp) value;
if (precision <= 6) {
timestampValue = timestamp.getMillisecond();
} else if (precision > 6) {
timestampValue = timestamp.getNanoOfMillisecond();
}
return ByteBuffer.allocate(8)
.order(ByteOrder.LITTLE_ENDIAN)
.putLong(timestampValue);
return convertTimestampWithPrecisionToBuffer(timestamp, precision);
case TIME_WITHOUT_TIME_ZONE:
Long time = ((Integer) value).longValue();
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, time);
default:
throw new UnsupportedOperationException("Cannot serialize type: " + type);
}
}

private static ByteBuffer convertTimestampWithPrecisionToBuffer(
Timestamp timestamp, int precision) {
long timestampValue;
if (precision <= 6) {
timestampValue = timestamp.getMillisecond();
} else {
timestampValue = timestamp.getNanoOfMillisecond();
}
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(timestampValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.*;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
Expand All @@ -36,6 +32,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -48,6 +45,7 @@

import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -271,6 +269,101 @@ public void testAppendOnlyTableWithAllTypes() throws Exception {
r -> "");
}

@Test
public void testPartitionedPrimaryKeyTable_Timestamp() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.TIMESTAMP(),
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.BIGINT()
},
new String[] {"pt1", "pt2", "k", "v1", "v2"});

BiFunction<Timestamp, Timestamp, BinaryRow> binaryRow =
(pt1, pt2) -> {
BinaryRow b = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(b);
writer.writeTimestamp(0, pt1, 6);
writer.writeTimestamp(1, pt2, 6);
writer.complete();
return b;
};

int numRecords = 1000;
ThreadLocalRandom random = ThreadLocalRandom.current();
List<TestRecord> testRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999));
Timestamp pt2 =
DateTimeUtils.timestampToTimestampWithLocalZone(pt1, DateTimeUtils.UTC_ZONE);
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
long v2 = random.nextLong();
testRecords.add(
new TestRecord(
binaryRow.apply(pt1, pt2),
String.format("%s|%s|%s", pt1, pt2, k),
String.format("%d|%d", v1, v2),
GenericRow.of(pt1, pt2, BinaryString.fromString(k), v1, v2)));
}

runCompatibilityTestForTimeAndTimeStamp(
rowType,
Arrays.asList("pt1", "pt2"),
Arrays.asList("pt1", "pt2", "k"),
testRecords);
}

@Test
public void testPartitionedPrimaryKeyTable_Time() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.TIMESTAMP(),
DataTypes.TIME(),
DataTypes.STRING(),
DataTypes.INT(),
DataTypes.BIGINT()
},
new String[] {"pt1", "pt2", "k", "v1", "v2"});

BiFunction<Timestamp, LocalTime, BinaryRow> binaryRow =
(pt1, pt2) -> {
BinaryRow b = new BinaryRow(2);
BinaryRowWriter writer = new BinaryRowWriter(b);
writer.writeTimestamp(0, pt1, 6);
writer.writeInt(1, pt2.getNano());
writer.complete();
return b;
};

int numRecords = 1000;
ThreadLocalRandom random = ThreadLocalRandom.current();
List<TestRecord> testRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
Timestamp pt1 = Timestamp.fromEpochMillis(random.nextInt(0, 99999));
LocalTime pt2 = LocalTime.ofNanoOfDay(LocalTime.now().getNano() + random.nextInt(1000));
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
long v2 = random.nextLong();
testRecords.add(
new TestRecord(
binaryRow.apply(pt1, pt2),
String.format("%s|%s|%s", pt1.getMillisecond(), pt2.getNano(), k),
String.format("%d|%d", v1, v2),
GenericRow.of(pt1, pt2.getNano(), BinaryString.fromString(k), v1, v2)));
}

runCompatibilityTestForTimeAndTimeStamp(
rowType,
Arrays.asList("pt1", "pt2"),
Arrays.asList("pt1", "pt2", "k"),
testRecords);
}

private void runCompatibilityTest(
RowType rowType,
List<String> partitionKeys,
Expand Down Expand Up @@ -332,6 +425,53 @@ private void runCompatibilityTest(
assertThat(actual).isEqualTo(expected);
}

private void runCompatibilityTestForTimeAndTimeStamp(
RowType rowType,
List<String> partitionKeys,
List<String> primaryKeys,
List<TestRecord> testRecords)
throws Exception {
LocalFileIO fileIO = LocalFileIO.create();
Path path = new Path(tempDir.toString());

Options options = new Options();
if (!primaryKeys.isEmpty()) {
options.set(CoreOptions.BUCKET, 2);
}
options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
options.set(CoreOptions.FILE_FORMAT, "avro");
Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), "");

FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path);
paimonCatalog.createDatabase("mydb", false);
Identifier paimonIdentifier = Identifier.create("mydb", "t");
paimonCatalog.createTable(paimonIdentifier, schema, false);
FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);

String commitUser = UUID.randomUUID().toString();
TableWriteImpl<?> write = table.newWrite(commitUser);
TableCommitImpl commit = table.newCommit(commitUser);

Map<String, String> expected = new HashMap<>();
for (TestRecord testRecord : testRecords) {
expected.put(testRecord.key, testRecord.value);
write.write(testRecord.record);
}

if (!primaryKeys.isEmpty()) {
for (BinaryRow partition :
testRecords.stream().map(t -> t.partition).collect(Collectors.toSet())) {
for (int b = 0; b < 2; b++) {
write.compact(partition, b, true);
}
}
}
commit.commit(1, write.prepareCommit(true, 1));
write.close();
commit.close();
}

private static class TestRecord {
private final BinaryRow partition;
private final String key;
Expand Down
Loading

0 comments on commit b557f69

Please sign in to comment.