Skip to content

Commit

Permalink
[hive] Use new ReadBuilder and WriteBuilder API in tests of paimon-hi…
Browse files Browse the repository at this point in the history
…ve module (apache#718)
  • Loading branch information
TyrantLucifer authored Mar 31, 2023
1 parent 07f24d9 commit 224a1ab
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,42 @@

package org.apache.paimon.hive;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;

import java.util.List;

import static org.apache.paimon.CoreOptions.PATH;

/** Test utils related to {@link FileStore}. */
public class FileStoreTestUtils {

public static FileStoreTable createFileStoreTable(
private static final String TABLE_NAME = "hive_test_table";

private static final String DATABASE_NAME = "default";

private static final Identifier TABLE_IDENTIFIER = Identifier.create(DATABASE_NAME, TABLE_NAME);

public static Table createFileStoreTable(
Options conf, RowType rowType, List<String> partitionKeys, List<String> primaryKeys)
throws Exception {
Path tablePath = CoreOptions.path(conf);
new SchemaManager(LocalFileIO.create(), tablePath)
.createTable(
new Schema(
rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""));

// only path, other config should be read from file store.
conf = new Options();
conf.set(PATH, tablePath.toString());
return FileStoreTableFactory.create(LocalFileIO.create(), conf);
// create CatalogContext using the options
CatalogContext catalogContext = CatalogContext.create(conf);
Catalog catalog = CatalogFactory.createCatalog(catalogContext);
// create database
catalog.createDatabase(DATABASE_NAME, false);
// create table
catalog.createTable(
TABLE_IDENTIFIER,
new Schema(rowType.getFields(), partitionKeys, primaryKeys, conf.toMap(), ""),
false);
Table table = catalog.getTable(TABLE_IDENTIFIER);
catalog.close();
return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.hive.mapred.PaimonInputFormat;
import org.apache.paimon.hive.objectinspector.PaimonObjectInspectorFactory;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
Expand Down Expand Up @@ -601,35 +603,37 @@ private String createChangelogExternalTable(
List<InternalRow> data)
throws Exception {
String path = folder.newFolder().toURI().toString();
String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
conf.set(CoreOptions.PATH, path);
conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(conf, rowType, partitionKeys, primaryKeys);

return writeData(table, path, data);
return writeData(table, tablePath, data);
}

private String createAppendOnlyExternalTable(
RowType rowType, List<String> partitionKeys, List<InternalRow> data) throws Exception {
String path = folder.newFolder().toURI().toString();
String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
conf.set(CoreOptions.PATH, path);
conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.BUCKET, 2);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(
conf, rowType, partitionKeys, Collections.emptyList());

return writeData(table, path, data);
return writeData(table, tablePath, data);
}

private String writeData(FileStoreTable table, String path, List<InternalRow> data)
throws Exception {
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
private String writeData(Table table, String path, List<InternalRow> data) throws Exception {
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
for (InternalRow rowData : data) {
write.write(rowData);
if (ThreadLocalRandom.current().nextInt(5) == 0) {
Expand All @@ -655,10 +659,11 @@ private String writeData(FileStoreTable table, String path, List<InternalRow> da
@Test
public void testReadAllSupportedTypes() throws Exception {
String root = folder.newFolder().toString();
String tablePath = String.format("%s/default.db/hive_test_table", root);
Options conf = new Options();
conf.set(CoreOptions.PATH, root);
conf.set(CatalogOptions.WAREHOUSE, root);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RandomGenericRowDataGenerator.ROW_TYPE,
Expand All @@ -678,8 +683,9 @@ public void testReadAllSupportedTypes() throws Exception {
}
}

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
for (GenericRow rowData : input) {
write.write(rowData);
}
Expand All @@ -692,7 +698,7 @@ public void testReadAllSupportedTypes() throws Exception {
Arrays.asList(
"CREATE EXTERNAL TABLE test_table",
"STORED BY '" + PaimonStorageHandler.class.getName() + "'",
"LOCATION '" + root + "'")));
"LOCATION '" + tablePath + "'")));
List<Object[]> actual =
hiveShell.executeStatement("SELECT * FROM test_table WHERE f_int > 0");

Expand Down Expand Up @@ -771,10 +777,11 @@ public void testReadAllSupportedTypes() throws Exception {
@Test
public void testPredicatePushDown() throws Exception {
String path = folder.newFolder().toURI().toString();
String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
conf.set(CoreOptions.PATH, path);
conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(new DataType[] {DataTypes.INT()}, new String[] {"a"}),
Expand All @@ -783,8 +790,9 @@ public void testPredicatePushDown() throws Exception {

// TODO add NaN related tests

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1));
commit.commit(0, write.prepareCommit(true, 0));
write.write(GenericRow.of((Object) null));
Expand All @@ -805,7 +813,7 @@ public void testPredicatePushDown() throws Exception {
Arrays.asList(
"CREATE EXTERNAL TABLE test_table",
"STORED BY '" + PaimonStorageHandler.class.getName() + "'",
"LOCATION '" + path + "'")));
"LOCATION '" + tablePath + "'")));
Assert.assertEquals(
Arrays.asList("1", "5"),
hiveShell.executeQuery("SELECT * FROM test_table WHERE a = 1 OR a = 5"));
Expand Down Expand Up @@ -860,10 +868,11 @@ public void testPredicatePushDown() throws Exception {
@Test
public void testDateAndTimestamp() throws Exception {
String path = folder.newFolder().toURI().toString();
String tablePath = String.format("%s/default.db/hive_test_table", path);
Options conf = new Options();
conf.set(CoreOptions.PATH, path);
conf.set(CatalogOptions.WAREHOUSE, path);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
Expand All @@ -872,8 +881,9 @@ public void testDateAndTimestamp() throws Exception {
Collections.emptyList(),
Collections.emptyList());

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(
GenericRow.of(
375, /* 1971-01-11 */
Expand All @@ -897,7 +907,7 @@ public void testDateAndTimestamp() throws Exception {
Arrays.asList(
"CREATE EXTERNAL TABLE test_table",
"STORED BY '" + PaimonStorageHandler.class.getName() + "'",
"LOCATION '" + path + "'")));
"LOCATION '" + tablePath + "'")));
Assert.assertEquals(
Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.1"),
hiveShell.executeQuery("SELECT * FROM test_table WHERE dt = '1971-01-11'"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,20 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.hive.FileStoreTestUtils;
import org.apache.paimon.hive.RowDataContainer;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

Expand All @@ -46,27 +49,20 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link PaimonRecordReader}. */
public class PaimonRecordReaderTest {

@TempDir java.nio.file.Path tempDir;
private String commitUser;

@BeforeEach
public void beforeEach() {
commitUser = UUID.randomUUID().toString();
}

@Test
public void testPk() throws Exception {
Options conf = new Options();
conf.set(CoreOptions.PATH, tempDir.toString());
conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
Expand All @@ -75,8 +71,9 @@ public void testPk() throws Exception {
Collections.emptyList(),
Collections.singletonList("a"));

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1L, BinaryString.fromString("Hi")));
write.write(GenericRow.of(2L, BinaryString.fromString("Hello")));
write.write(GenericRow.of(3L, BinaryString.fromString("World")));
Expand All @@ -102,9 +99,9 @@ public void testPk() throws Exception {
@Test
public void testValueCount() throws Exception {
Options conf = new Options();
conf.set(CoreOptions.PATH, tempDir.toString());
conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
Expand All @@ -113,8 +110,9 @@ public void testValueCount() throws Exception {
Collections.emptyList(),
Collections.emptyList());

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1, BinaryString.fromString("Hi")));
write.write(GenericRow.of(2, BinaryString.fromString("Hello")));
write.write(GenericRow.of(3, BinaryString.fromString("World")));
Expand All @@ -141,9 +139,9 @@ public void testValueCount() throws Exception {
@Test
public void testProjectionPushdown() throws Exception {
Options conf = new Options();
conf.set(CoreOptions.PATH, tempDir.toString());
conf.set(CatalogOptions.WAREHOUSE, tempDir.toString());
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
FileStoreTable table =
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
RowType.of(
Expand All @@ -154,8 +152,9 @@ public void testProjectionPushdown() throws Exception {
Collections.emptyList(),
Collections.emptyList());

StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi")));
write.write(GenericRow.of(2, 20L, BinaryString.fromString("Hello")));
write.write(GenericRow.of(1, 10L, BinaryString.fromString("Hi")));
Expand All @@ -176,20 +175,20 @@ public void testProjectionPushdown() throws Exception {
assertThat(actual).isEqualTo(expected);
}

private PaimonRecordReader read(FileStoreTable table, BinaryRow partition, int bucket)
throws Exception {
return read(table, partition, bucket, table.schema().fieldNames());
private PaimonRecordReader read(Table table, BinaryRow partition, int bucket) throws Exception {
return read(table, partition, bucket, ((FileStoreTable) table).schema().fieldNames());
}

private PaimonRecordReader read(
FileStoreTable table, BinaryRow partition, int bucket, List<String> selectedColumns)
Table table, BinaryRow partition, int bucket, List<String> selectedColumns)
throws Exception {
for (DataSplit split : table.newScan().plan().splits) {
if (split.partition().equals(partition) && split.bucket() == bucket) {
for (Split split : table.newReadBuilder().newScan().plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
if (dataSplit.partition().equals(partition) && dataSplit.bucket() == bucket) {
return new PaimonRecordReader(
table.newReadBuilder(),
new PaimonInputSplit(tempDir.toString(), split),
table.schema().fieldNames(),
new PaimonInputSplit(tempDir.toString(), dataSplit),
((FileStoreTable) table).schema().fieldNames(),
selectedColumns);
}
}
Expand Down

0 comments on commit 224a1ab

Please sign in to comment.