From 419b02a836da34e2050a1d6c56a57e3ea32d7e99 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 2 Dec 2024 21:09:48 +0800 Subject: [PATCH 1/6] [iceberg] Introduce integration for AWS Glue (#4624) --- .../migration/iceberg-compatibility.md | 11 ++++++ .../AbstractIcebergCommitCallback.java | 35 +++++++++--------- .../apache/paimon/iceberg/IcebergOptions.java | 6 ++++ .../iceberg/IcebergHiveMetadataCommitter.java | 8 ++--- .../IcebergHive23MetadataCommitterITCase.java | 9 ++++- .../IcebergHive31MetadataCommitterITCase.java | 9 ++++- ...cebergHiveMetadataCommitterITCaseBase.java | 36 +++++++++++++++++++ 7 files changed, 92 insertions(+), 22 deletions(-) diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md index d74560714864..01a03a45264d 100644 --- a/docs/content/migration/iceberg-compatibility.md +++ b/docs/content/migration/iceberg-compatibility.md @@ -383,9 +383,20 @@ you also need to set some (or all) of the following table options when creating Boolean Should use the legacy manifest version to generate Iceberg's 1.4 manifest files. + +
metadata.iceberg.hive-client-class
+ org.apache.hadoop.hive.metastore.HiveMetaStoreClient + String + Hive client class name for Iceberg Hive Catalog. + +## AWS Glue Catalog + +You can use Hive Catalog to connect AWS Glue metastore, you can use set `'metadata.iceberg.hive-client-class'` to +`'com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient'`. + ## AWS Athena AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index 1b952c1716cf..7ea6cbe05777 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -112,22 +112,7 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { break; case HADOOP_CATALOG: case HIVE_CATALOG: - Path dbPath = table.location().getParent(); - final String dbSuffix = ".db"; - if (dbPath.getName().endsWith(dbSuffix)) { - String dbName = - dbPath.getName() - .substring(0, dbPath.getName().length() - dbSuffix.length()); - String tableName = table.location().getName(); - Path separatePath = - new Path( - dbPath.getParent(), - String.format("iceberg/%s/%s/metadata", dbName, tableName)); - this.pathFactory = new IcebergPathFactory(separatePath); - } else { - throw new UnsupportedOperationException( - "Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse."); - } + this.pathFactory = new IcebergPathFactory(catalogTableMetadataPath(table)); break; default: throw new UnsupportedOperationException( @@ -152,6 +137,24 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { this.manifestList = IcebergManifestList.create(table, pathFactory); } + public static Path catalogTableMetadataPath(FileStoreTable table) { + Path icebergDBPath = catalogDatabasePath(table); + return new Path(icebergDBPath, String.format("%s/metadata", table.location().getName())); + } + + public static Path catalogDatabasePath(FileStoreTable table) { + Path dbPath = table.location().getParent(); + final String dbSuffix = ".db"; + if (dbPath.getName().endsWith(dbSuffix)) { + String dbName = + dbPath.getName().substring(0, dbPath.getName().length() - dbSuffix.length()); + return new Path(dbPath.getParent(), String.format("iceberg/%s/", dbName)); + } else { + throw new UnsupportedOperationException( + "Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse."); + } + } + @Override public void close() throws Exception {} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java index c0ceed97ba8c..4b59e29c8c33 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java @@ -84,6 +84,12 @@ public class IcebergOptions { .withDescription( "Should use the legacy manifest version to generate Iceberg's 1.4 manifest files."); + public static final ConfigOption HIVE_CLIENT_CLASS = + key("metadata.iceberg.hive-client-class") + .stringType() + .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient") + .withDescription("Hive client class name for Iceberg Hive Catalog."); + /** Where to store Iceberg metadata. */ public enum StorageType implements DescribedEnum { DISABLED("disabled", "Disable Iceberg compatibility support."), diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java index d913f729e351..ddd21384cbc8 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java @@ -22,7 +22,6 @@ import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.Path; import org.apache.paimon.hive.HiveCatalog; -import org.apache.paimon.hive.HiveCatalogFactory; import org.apache.paimon.hive.HiveTypeUtils; import org.apache.paimon.hive.pool.CachedClientPool; import org.apache.paimon.options.Options; @@ -49,6 +48,8 @@ import java.util.HashMap; import java.util.stream.Collectors; +import static org.apache.paimon.iceberg.AbstractIcebergCommitCallback.catalogDatabasePath; + /** * {@link IcebergMetadataCommitter} to commit Iceberg metadata to Hive metastore, so the table can * be visited by Iceberg's Hive catalog. @@ -98,9 +99,7 @@ public IcebergHiveMetadataCommitter(FileStoreTable table) { this.clients = new CachedClientPool( - hiveConf, - options, - HiveCatalogFactory.METASTORE_CLIENT_CLASS.defaultValue()); + hiveConf, options, options.getString(IcebergOptions.HIVE_CLIENT_CLASS)); } @Override @@ -158,6 +157,7 @@ private boolean databaseExists(String databaseName) throws Exception { private void createDatabase(String databaseName) throws Exception { Database database = new Database(); database.setName(databaseName); + database.setLocationUri(catalogDatabasePath(table).toString()); clients.execute(client -> client.createDatabase(database)); } diff --git a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java index a9e4ba945440..7d726e75a17d 100644 --- a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java +++ b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java @@ -18,5 +18,12 @@ package org.apache.paimon.iceberg; +import org.apache.paimon.hive.CreateFailHiveMetaStoreClient; + /** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 2.3. */ -public class IcebergHive23MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {} +public class IcebergHive23MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase { + @Override + protected String createFailHiveMetaStoreClient() { + return CreateFailHiveMetaStoreClient.class.getName(); + } +} diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java index 6f4b0afd1ae1..0634adfad357 100644 --- a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java +++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java @@ -18,5 +18,12 @@ package org.apache.paimon.iceberg; +import org.apache.paimon.hive.CreateFailHiveMetaStoreClient; + /** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 3.1. */ -public class IcebergHive31MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {} +public class IcebergHive31MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase { + @Override + protected String createFailHiveMetaStoreClient() { + return CreateFailHiveMetaStoreClient.class.getName(); + } +} diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java index fab22775751b..d0c64c5d3b7f 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java @@ -104,6 +104,12 @@ public void testPrimaryKeyTable() throws Exception { Row.of(2, 1, "cat"), Row.of(2, 2, "elephant")), collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t ORDER BY pt, id"))); + + Assert.assertTrue( + hiveShell + .executeQuery("DESC DATABASE EXTENDED test_db") + .toString() + .contains("iceberg/test_db")); } @Test @@ -150,6 +156,36 @@ public void testAppendOnlyTable() throws Exception { "SELECT data, id, pt FROM my_iceberg.test_db.t WHERE id > 1 ORDER BY pt, id"))); } + @Test + public void testCustomMetastoreClass() { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv.executeSql( + "CREATE CATALOG my_paimon WITH ( 'type' = 'paimon', 'warehouse' = '" + + path + + "' )"); + tEnv.executeSql("CREATE DATABASE my_paimon.test_db"); + tEnv.executeSql( + String.format( + "CREATE TABLE my_paimon.test_db.t ( pt INT, id INT, data STRING ) PARTITIONED BY (pt) WITH " + + "( " + + "'metadata.iceberg.storage' = 'hive-catalog', " + + "'metadata.iceberg.uri' = '', " + + "'file.format' = 'avro', " + + "'metadata.iceberg.hive-client-class' = '%s')", + createFailHiveMetaStoreClient())); + Assert.assertThrows( + Exception.class, + () -> + tEnv.executeSql( + "INSERT INTO my_paimon.test_db.t VALUES " + + "(1, 1, 'apple'), (1, 2, 'pear'), (2, 1, 'cat'), (2, 2, 'dog')") + .await()); + } + + protected abstract String createFailHiveMetaStoreClient(); + private List collect(TableResult result) throws Exception { List rows = new ArrayList<>(); try (CloseableIterator it = result.collect()) { From 6fb887f47f2e79f6b3142f094b20b6d7a3f86846 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 2 Dec 2024 21:11:23 +0800 Subject: [PATCH 2/6] [flink] Avoid deprecated usage on TableSchema, DataType and DescriptorProperties (#4611) --- .../apache/paimon/flink/DataCatalogTable.java | 115 +++++++++++----- .../org/apache/paimon/flink/FlinkCatalog.java | 55 ++++---- .../paimon/flink/FlinkGenericCatalog.java | 6 - .../paimon/flink/SystemCatalogTable.java | 12 +- .../utils/FlinkCatalogPropertiesUtil.java | 102 ++++---------- .../utils/FlinkDescriptorProperties.java | 99 +++++++++++++ .../flink/FlinkCatalogPropertiesUtilTest.java | 130 +++++++++++++----- .../apache/paimon/flink/FlinkCatalogTest.java | 9 +- 8 files changed, 342 insertions(+), 186 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java index 019d7bd6892f..e141581b476b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java @@ -23,33 +23,55 @@ import org.apache.paimon.types.DataField; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; -/** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */ -public class DataCatalogTable extends CatalogTableImpl { +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A {@link CatalogTable} to wrap {@link FileStoreTable}. */ +public class DataCatalogTable implements CatalogTable { + // Schema of the table (column names and types) + private final Schema schema; + + // Partition keys if this is a partitioned table. It's an empty set if the table is not + // partitioned + private final List partitionKeys; + + // Properties of the table + private final Map options; + + // Comment of the table + private final String comment; private final Table table; private final Map nonPhysicalColumnComments; public DataCatalogTable( Table table, - TableSchema tableSchema, + Schema resolvedSchema, List partitionKeys, - Map properties, + Map options, String comment, Map nonPhysicalColumnComments) { - super(tableSchema, partitionKeys, properties, comment); + this.schema = resolvedSchema; + this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null"); + this.options = checkNotNull(options, "options cannot be null"); + + checkArgument( + options.entrySet().stream() + .allMatch(e -> e.getKey() != null && e.getValue() != null), + "properties cannot have null keys or values"); + + this.comment = comment; + this.table = table; this.nonPhysicalColumnComments = nonPhysicalColumnComments; } @@ -66,32 +88,30 @@ public Schema getUnresolvedSchema() { .filter(dataField -> dataField.description() != null) .collect(Collectors.toMap(DataField::name, DataField::description)); - return toSchema(getSchema(), columnComments); + return toSchema(schema, columnComments); } - /** Copied from {@link TableSchema#toSchema(Map)} to support versions lower than 1.17. */ - private Schema toSchema(TableSchema tableSchema, Map comments) { + private Schema toSchema(Schema tableSchema, Map comments) { final Schema.Builder builder = Schema.newBuilder(); - tableSchema - .getTableColumns() + .getColumns() .forEach( column -> { - if (column instanceof TableColumn.PhysicalColumn) { - final TableColumn.PhysicalColumn c = - (TableColumn.PhysicalColumn) column; - builder.column(c.getName(), c.getType()); - } else if (column instanceof TableColumn.MetadataColumn) { - final TableColumn.MetadataColumn c = - (TableColumn.MetadataColumn) column; + if (column instanceof Schema.UnresolvedPhysicalColumn) { + final Schema.UnresolvedPhysicalColumn c = + (Schema.UnresolvedPhysicalColumn) column; + builder.column(c.getName(), c.getDataType()); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + final Schema.UnresolvedMetadataColumn c = + (Schema.UnresolvedMetadataColumn) column; builder.columnByMetadata( c.getName(), - c.getType(), - c.getMetadataAlias().orElse(null), + c.getDataType(), + c.getMetadataKey(), c.isVirtual()); - } else if (column instanceof TableColumn.ComputedColumn) { - final TableColumn.ComputedColumn c = - (TableColumn.ComputedColumn) column; + } else if (column instanceof Schema.UnresolvedComputedColumn) { + final Schema.UnresolvedComputedColumn c = + (Schema.UnresolvedComputedColumn) column; builder.columnByExpression(c.getName(), c.getExpression()); } else { throw new IllegalArgumentException( @@ -104,19 +124,16 @@ private Schema toSchema(TableSchema tableSchema, Map comments) { builder.withComment(nonPhysicalColumnComments.get(colName)); } }); - tableSchema .getWatermarkSpecs() .forEach( spec -> builder.watermark( - spec.getRowtimeAttribute(), spec.getWatermarkExpr())); - + spec.getColumnName(), spec.getWatermarkExpression())); if (tableSchema.getPrimaryKey().isPresent()) { - UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get(); - builder.primaryKeyNamed(primaryKey.getName(), primaryKey.getColumns()); + Schema.UnresolvedPrimaryKey primaryKey = tableSchema.getPrimaryKey().get(); + builder.primaryKeyNamed(primaryKey.getConstraintName(), primaryKey.getColumnNames()); } - return builder.build(); } @@ -124,7 +141,7 @@ private Schema toSchema(TableSchema tableSchema, Map comments) { public CatalogBaseTable copy() { return new DataCatalogTable( table, - getSchema().copy(), + schema, new ArrayList<>(getPartitionKeys()), new HashMap<>(getOptions()), getComment(), @@ -135,10 +152,40 @@ public CatalogBaseTable copy() { public CatalogTable copy(Map options) { return new DataCatalogTable( table, - getSchema(), + schema, getPartitionKeys(), options, getComment(), nonPhysicalColumnComments); } + + @Override + public Optional getDescription() { + return Optional.of(getComment()); + } + + @Override + public Optional getDetailedDescription() { + return Optional.of("This is a catalog table in an im-memory catalog"); + } + + @Override + public boolean isPartitioned() { + return !partitionKeys.isEmpty(); + } + + @Override + public List getPartitionKeys() { + return partitionKeys; + } + + @Override + public Map getOptions() { + return options; + } + + @Override + public String getComment() { + return comment != null ? comment : ""; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index c67e79c1c06b..3a7f9790ccca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; +import org.apache.paimon.flink.utils.FlinkDescriptorProperties; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.operation.FileStoreCommit; @@ -46,7 +47,6 @@ import org.apache.paimon.view.View; import org.apache.paimon.view.ViewImpl; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; @@ -96,7 +96,6 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.procedures.Procedure; @@ -121,13 +120,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes; @@ -152,11 +144,18 @@ import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem; import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR; import static org.apache.paimon.flink.utils.TableStatsUtil.createTableColumnStats; import static org.apache.paimon.flink.utils.TableStatsUtil.createTableStats; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -1008,18 +1007,18 @@ private static void validateAlterTable(CatalogBaseTable ct1, CatalogBaseTable ct } // materialized table is not resolved at this time. if (!table1IsMaterialized) { - org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema(); - org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema(); + org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema(); + org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema(); boolean pkEquality = false; if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { pkEquality = Objects.equals( - ts1.getPrimaryKey().get().getType(), - ts2.getPrimaryKey().get().getType()) + ts1.getPrimaryKey().get().getConstraintName(), + ts2.getPrimaryKey().get().getConstraintName()) && Objects.equals( - ts1.getPrimaryKey().get().getColumns(), - ts2.getPrimaryKey().get().getColumns()); + ts1.getPrimaryKey().get().getColumnNames(), + ts2.getPrimaryKey().get().getColumnNames()); } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { pkEquality = true; } @@ -1063,7 +1062,8 @@ public final void close() throws CatalogException { private CatalogBaseTable toCatalogTable(Table table) { Map newOptions = new HashMap<>(table.options()); - TableSchema.Builder builder = TableSchema.builder(); + org.apache.flink.table.api.Schema.Builder builder = + org.apache.flink.table.api.Schema.newBuilder(); Map nonPhysicalColumnComments = new HashMap<>(); // add columns @@ -1078,10 +1078,10 @@ private CatalogBaseTable toCatalogTable(Table table) { if (optionalName == null || physicalColumns.contains(optionalName)) { // build physical column from table row field RowType.RowField field = physicalRowFields.get(physicalColumnIndex++); - builder.field(field.getName(), fromLogicalToDataType(field.getType())); + builder.column(field.getName(), fromLogicalToDataType(field.getType())); } else { // build non-physical column from options - builder.add(deserializeNonPhysicalColumn(newOptions, i)); + deserializeNonPhysicalColumn(newOptions, i, builder); if (newOptions.containsKey(compoundKey(SCHEMA, i, COMMENT))) { nonPhysicalColumnComments.put( optionalName, newOptions.get(compoundKey(SCHEMA, i, COMMENT))); @@ -1093,22 +1093,18 @@ private CatalogBaseTable toCatalogTable(Table table) { // extract watermark information if (newOptions.keySet().stream() .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { - builder.watermark(deserializeWatermarkSpec(newOptions)); + deserializeWatermarkSpec(newOptions, builder); } // add primary keys if (table.primaryKeys().size() > 0) { - builder.primaryKey( - table.primaryKeys().stream().collect(Collectors.joining("_", "PK_", "")), - table.primaryKeys().toArray(new String[0])); + builder.primaryKey(table.primaryKeys()); } - TableSchema schema = builder.build(); + org.apache.flink.table.api.Schema schema = builder.build(); // remove schema from options - DescriptorProperties removeProperties = new DescriptorProperties(false); - removeProperties.putTableSchema(SCHEMA, schema); - removeProperties.asMap().keySet().forEach(newOptions::remove); + FlinkDescriptorProperties.removeSchemaKeys(SCHEMA, schema, newOptions); Options options = Options.fromMap(newOptions); if (TableType.MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) { @@ -1124,7 +1120,10 @@ private CatalogBaseTable toCatalogTable(Table table) { } private CatalogMaterializedTable buildMaterializedTable( - Table table, Map newOptions, TableSchema schema, Options options) { + Table table, + Map newOptions, + org.apache.flink.table.api.Schema schema, + Options options) { String definitionQuery = options.get(MATERIALIZED_TABLE_DEFINITION_QUERY); IntervalFreshness freshness = IntervalFreshness.of( @@ -1148,7 +1147,7 @@ private CatalogMaterializedTable buildMaterializedTable( // remove materialized table related options allMaterializedTableAttributes().forEach(newOptions::remove); return CatalogMaterializedTable.newBuilder() - .schema(schema.toSchema()) + .schema(schema) .comment(table.comment().orElse("")) .partitionKeys(table.partitionKeys()) .options(newOptions) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java index 37bed2d0480f..75af5917bb49 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java @@ -48,7 +48,6 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FunctionDefinitionFactory; -import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.procedures.Procedure; import java.util.List; @@ -86,11 +85,6 @@ public Optional getFactory() { new FlinkGenericTableFactory(paimon.getFactory().get(), flink.getFactory().get())); } - @Override - public Optional getTableFactory() { - return flink.getTableFactory(); - } - @Override public Optional getFunctionDefinitionFactory() { return flink.getFunctionDefinitionFactory(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java index d5d843d91bb1..f88a808713c2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java @@ -22,7 +22,6 @@ import org.apache.paimon.table.system.AuditLogTable; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.WatermarkSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.types.utils.TypeConversions; @@ -32,11 +31,11 @@ import java.util.Map; import java.util.Optional; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; /** A {@link CatalogTable} to represent system table. */ public class SystemCatalogTable implements CatalogTable { @@ -60,11 +59,8 @@ public Schema getUnresolvedSchema() { Map newOptions = new HashMap<>(table.options()); if (newOptions.keySet().stream() .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { - WatermarkSpec watermarkSpec = deserializeWatermarkSpec(newOptions); - return builder.watermark( - watermarkSpec.getRowtimeAttribute(), - watermarkSpec.getWatermarkExpr()) - .build(); + deserializeWatermarkSpec(newOptions, builder); + return builder.build(); } } return builder.build(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java index b0f99a6e89e4..fa84a1ca070d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java @@ -20,8 +20,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.WatermarkSpec; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.types.DataType; @@ -36,48 +35,23 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT; -import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR; /** * Utilities for ser/deserializing non-physical columns and watermark into/from a map of string * properties. */ public class FlinkCatalogPropertiesUtil { - - public static Map serializeNonPhysicalColumns( - Map indexMap, List nonPhysicalColumns) { - Map serialized = new HashMap<>(); - for (TableColumn c : nonPhysicalColumns) { - int index = indexMap.get(c.getName()); - serialized.put(compoundKey(SCHEMA, index, NAME), c.getName()); - serialized.put( - compoundKey(SCHEMA, index, DATA_TYPE), - c.getType().getLogicalType().asSerializableString()); - if (c instanceof TableColumn.ComputedColumn) { - TableColumn.ComputedColumn computedColumn = (TableColumn.ComputedColumn) c; - serialized.put(compoundKey(SCHEMA, index, EXPR), computedColumn.getExpression()); - } else { - TableColumn.MetadataColumn metadataColumn = (TableColumn.MetadataColumn) c; - serialized.put( - compoundKey(SCHEMA, index, METADATA), - metadataColumn.getMetadataAlias().orElse(metadataColumn.getName())); - serialized.put( - compoundKey(SCHEMA, index, VIRTUAL), - Boolean.toString(metadataColumn.isVirtual())); - } - } - return serialized; - } + public static final String SCHEMA = "schema"; /** Serialize non-physical columns of new api. */ public static Map serializeNonPhysicalNewColumns(ResolvedSchema schema) { @@ -119,22 +93,6 @@ public static Map serializeNonPhysicalNewColumns(ResolvedSchema return serialized; } - public static Map serializeWatermarkSpec(WatermarkSpec watermarkSpec) { - Map serializedWatermarkSpec = new HashMap<>(); - String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_ROWTIME), - watermarkSpec.getRowtimeAttribute()); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR), - watermarkSpec.getWatermarkExpr()); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE), - watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString()); - - return serializedWatermarkSpec; - } - public static Map serializeNewWatermarkSpec( org.apache.flink.table.catalog.WatermarkSpec watermarkSpec) { Map serializedWatermarkSpec = new HashMap<>(); @@ -219,7 +177,8 @@ private static boolean isColumnNameKey(String key) { && SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches(); } - public static TableColumn deserializeNonPhysicalColumn(Map options, int index) { + public static void deserializeNonPhysicalColumn( + Map options, int index, Schema.Builder builder) { String nameKey = compoundKey(SCHEMA, index, NAME); String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE); String exprKey = compoundKey(SCHEMA, index, EXPR); @@ -227,45 +186,42 @@ public static TableColumn deserializeNonPhysicalColumn(Map optio String virtualKey = compoundKey(SCHEMA, index, VIRTUAL); String name = options.get(nameKey); - DataType dataType = - TypeConversions.fromLogicalToDataType( - LogicalTypeParser.parse(options.get(dataTypeKey))); - TableColumn column; if (options.containsKey(exprKey)) { - column = TableColumn.computed(name, dataType, options.get(exprKey)); + final String expr = options.get(exprKey); + builder.columnByExpression(name, expr); } else if (options.containsKey(metadataKey)) { String metadataAlias = options.get(metadataKey); boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey)); - column = - metadataAlias.equals(name) - ? TableColumn.metadata(name, dataType, isVirtual) - : TableColumn.metadata(name, dataType, metadataAlias, isVirtual); + DataType dataType = + TypeConversions.fromLogicalToDataType( + LogicalTypeParser.parse( + options.get(dataTypeKey), + Thread.currentThread().getContextClassLoader())); + if (metadataAlias.equals(name)) { + builder.columnByMetadata(name, dataType, isVirtual); + } else { + builder.columnByMetadata(name, dataType, metadataAlias, isVirtual); + } } else { throw new RuntimeException( String.format( "Failed to build non-physical column. Current index is %s, options are %s", index, options)); } - - return column; } - public static WatermarkSpec deserializeWatermarkSpec(Map options) { + public static void deserializeWatermarkSpec( + Map options, Schema.Builder builder) { String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK); String rowtimeKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_ROWTIME); String exprKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_STRATEGY_EXPR); - String dataTypeKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_STRATEGY_DATA_TYPE); String rowtimeAttribute = options.get(rowtimeKey); String watermarkExpressionString = options.get(exprKey); - DataType watermarkExprOutputType = - TypeConversions.fromLogicalToDataType( - LogicalTypeParser.parse(options.get(dataTypeKey))); - return new WatermarkSpec( - rowtimeAttribute, watermarkExpressionString, watermarkExprOutputType); + builder.watermark(rowtimeAttribute, watermarkExpressionString); } public static String compoundKey(Object... components) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java new file mode 100644 index 000000000000..edc73ca7bf41 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.table.api.Schema; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class for having a unified string-based representation of Table API related classes such + * as Schema, TypeInformation, etc. + * + *

Note to implementers: Please try to reuse key names as much as possible. Key-names should be + * hierarchical and lower case. Use "-" instead of dots or camel case. E.g., + * connector.schema.start-from = from-earliest. Try not to use the higher level in a key-name. E.g., + * instead of connector.kafka.kafka-version use connector.kafka.version. + * + *

Properties with key normalization enabled contain only lower-case keys. + */ +public class FlinkDescriptorProperties { + + public static final String NAME = "name"; + + public static final String DATA_TYPE = "data-type"; + + public static final String EXPR = "expr"; + + public static final String METADATA = "metadata"; + + public static final String VIRTUAL = "virtual"; + + public static final String WATERMARK = "watermark"; + + public static final String WATERMARK_ROWTIME = "rowtime"; + + public static final String WATERMARK_STRATEGY = "strategy"; + + public static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + '.' + EXPR; + + public static final String WATERMARK_STRATEGY_DATA_TYPE = WATERMARK_STRATEGY + '.' + DATA_TYPE; + + public static final String PRIMARY_KEY_NAME = "primary-key.name"; + + public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns"; + + public static final String COMMENT = "comment"; + + public static void removeSchemaKeys(String key, Schema schema, Map options) { + checkNotNull(key); + checkNotNull(schema); + + List subKeys = Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, VIRTUAL); + for (int idx = 0; idx < schema.getColumns().size(); idx++) { + for (String subKey : subKeys) { + options.remove(key + '.' + idx + '.' + subKey); + } + } + + if (!schema.getWatermarkSpecs().isEmpty()) { + subKeys = + Arrays.asList( + WATERMARK_ROWTIME, + WATERMARK_STRATEGY_EXPR, + WATERMARK_STRATEGY_DATA_TYPE); + for (int idx = 0; idx < schema.getWatermarkSpecs().size(); idx++) { + for (String subKey : subKeys) { + options.remove(key + '.' + WATERMARK + '.' + idx + '.' + subKey); + } + } + } + + schema.getPrimaryKey() + .ifPresent( + pk -> { + options.remove(key + '.' + PRIMARY_KEY_NAME); + options.remove(key + '.' + PRIMARY_KEY_COLUMNS); + }); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java index 9268a236b6cb..e32150b1fe82 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java @@ -21,27 +21,35 @@ import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.WatermarkSpec; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionVisitor; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.types.DataType; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link FlinkCatalogPropertiesUtil}. */ @@ -49,18 +57,27 @@ public class FlinkCatalogPropertiesUtilTest { @Test public void testSerDeNonPhysicalColumns() { - Map indexMap = new HashMap<>(); - indexMap.put("comp", 2); - indexMap.put("meta1", 3); - indexMap.put("meta2", 5); - List columns = new ArrayList<>(); - columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2")); - columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10))); - columns.add(TableColumn.metadata("meta2", DataTypes.BIGINT().notNull(), "price", true)); + List columns = new ArrayList<>(); + columns.add(new Schema.UnresolvedComputedColumn("comp", new SqlCallExpression("`k` * 2"))); + columns.add( + new Schema.UnresolvedMetadataColumn("meta1", DataTypes.VARCHAR(10), null, false)); + columns.add( + new Schema.UnresolvedMetadataColumn( + "meta2", DataTypes.BIGINT().notNull(), "price", true, null)); + + List resolvedColumns = new ArrayList<>(); + resolvedColumns.add(Column.physical("phy1", DataTypes.INT())); + resolvedColumns.add(Column.physical("phy2", DataTypes.INT())); + resolvedColumns.add( + Column.computed("comp", new TestResolvedExpression("`k` * 2", DataTypes.INT()))); + resolvedColumns.add(Column.metadata("meta1", DataTypes.VARCHAR(10), null, false)); + resolvedColumns.add(Column.physical("phy3", DataTypes.INT())); + resolvedColumns.add(Column.metadata("meta2", DataTypes.BIGINT().notNull(), "price", true)); // validate serialization Map serialized = - FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns); + FlinkCatalogPropertiesUtil.serializeNonPhysicalNewColumns( + new ResolvedSchema(resolvedColumns, Collections.emptyList(), null)); Map expected = new HashMap<>(); expected.put(compoundKey(SCHEMA, 2, NAME), "comp"); @@ -80,27 +97,26 @@ public void testSerDeNonPhysicalColumns() { assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected); // validate deserialization - List deserialized = new ArrayList<>(); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 2)); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 3)); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 5)); + Schema.Builder builder = Schema.newBuilder(); + FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 2, builder); + FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 3, builder); + FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 5, builder); - assertThat(deserialized).isEqualTo(columns); - - // validate that + assertThat(builder.build().getColumns()) + .containsExactly(columns.toArray(new Schema.UnresolvedColumn[0])); } @Test public void testSerDeWatermarkSpec() { WatermarkSpec watermarkSpec = - new WatermarkSpec( + WatermarkSpec.of( "test_time", - "`test_time` - INTERVAL '0.001' SECOND", - DataTypes.TIMESTAMP(3)); + new TestResolvedExpression( + "`test_time` - INTERVAL '0.001' SECOND", DataTypes.TIMESTAMP(3))); // validate serialization Map serialized = - FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec); + FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec(watermarkSpec); Map expected = new HashMap<>(); String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0); @@ -113,9 +129,13 @@ public void testSerDeWatermarkSpec() { assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected); // validate serialization - WatermarkSpec deserialized = - FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized); - assertThat(deserialized).isEqualTo(watermarkSpec); + Schema.Builder builder = Schema.newBuilder(); + FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized, builder); + assertThat(builder.build().getWatermarkSpecs()).hasSize(1); + Schema.UnresolvedWatermarkSpec actual = builder.build().getWatermarkSpecs().get(0); + assertThat(actual.getColumnName()).isEqualTo(watermarkSpec.getRowtimeAttribute()); + assertThat(actual.getWatermarkExpression().asSummaryString()) + .isEqualTo(watermarkSpec.getWatermarkExpression().asSummaryString()); } @Test @@ -150,4 +170,44 @@ public void testNonPhysicalColumnsCount() { oldStyleOptions, Arrays.asList("phy1", "phy2"))) .isEqualTo(3); } + + private static class TestResolvedExpression implements ResolvedExpression { + private final String name; + private final DataType outputDataType; + + private TestResolvedExpression(String name, DataType outputDataType) { + this.name = name; + this.outputDataType = outputDataType; + } + + @Override + public DataType getOutputDataType() { + return outputDataType; + } + + @Override + public List getResolvedChildren() { + return Collections.emptyList(); + } + + @Override + public String asSummaryString() { + return new SqlCallExpression(name).asSummaryString(); + } + + @Override + public String asSerializableString() { + return name; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(ExpressionVisitor expressionVisitor) { + return null; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 27a89510975f..e4286eb18172 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -850,7 +850,7 @@ private static void checkEquals(CatalogBaseTable t1, CatalogBaseTable t2) { assertThat(t2.getComment()).isEqualTo(t1.getComment()); assertThat(t2.getOptions()).isEqualTo(t1.getOptions()); if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) { - assertThat(t2.getSchema()).isEqualTo(t1.getSchema()); + assertThat(t2.getUnresolvedSchema()).isEqualTo(t1.getUnresolvedSchema()); assertThat(((CatalogTable) (t2)).getPartitionKeys()) .isEqualTo(((CatalogTable) (t1)).getPartitionKeys()); assertThat(((CatalogTable) (t2)).isPartitioned()) @@ -864,7 +864,12 @@ private static void checkEquals(CatalogBaseTable t1, CatalogBaseTable t2) { t2.getUnresolvedSchema() .resolve(new TestSchemaResolver())) .build()) - .isEqualTo(t1.getSchema().toSchema()); + .isEqualTo( + Schema.newBuilder() + .fromResolvedSchema( + t1.getUnresolvedSchema() + .resolve(new TestSchemaResolver())) + .build()); assertThat(mt2.getPartitionKeys()).isEqualTo(mt1.getPartitionKeys()); assertThat(mt2.isPartitioned()).isEqualTo(mt1.isPartitioned()); // validate definition query From 3c820828062abef86b278f5a0334b6e65570c54b Mon Sep 17 00:00:00 2001 From: WenjunMin Date: Mon, 2 Dec 2024 21:42:22 +0800 Subject: [PATCH 3/6] [core] Make default of 'lookup.local-file-type' to sort (#4622) --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- .../paimon/benchmark/lookup/AbstractLookupBenchmark.java | 5 ++++- .../paimon/benchmark/lookup/LookupReaderBenchmark.java | 2 +- .../src/main/java/org/apache/paimon/CoreOptions.java | 2 +- .../main/java/org/apache/paimon/io/cache/CacheBuilder.java | 4 ++++ 5 files changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index fad1f4907e5a..2ad5db28b9ba 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -443,7 +443,7 @@

lookup.local-file-type
- hash + sort

Enum

The local file type for lookup.

Possible values:
  • "sort": Construct a sorted file for lookup.
  • "hash": Construct a hash file for lookup.
diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java index 635d876f7a98..653bfee6cc00 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java @@ -102,7 +102,10 @@ protected Pair writeData( new CacheManager(MemorySize.ofMebiBytes(10)), keySerializer.createSliceComparator()); - File file = new File(tempDir.toFile(), UUID.randomUUID().toString()); + String name = + String.format( + "%s-%s-%s", options.lookupLocalFileType(), valueLength, bloomFilterEnabled); + File file = new File(tempDir.toFile(), UUID.randomUUID() + "-" + name); LookupStoreWriter writer = factory.createWriter(file, createBloomFiler(bloomFilterEnabled)); int i = 0; for (byte[] input : inputs) { diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java index 9947b54a70f0..2d8de84327d4 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java @@ -128,7 +128,7 @@ private void readData( LookupStoreFactory factory = LookupStoreFactory.create( options, - new CacheManager(MemorySize.ofMebiBytes(10)), + new CacheManager(MemorySize.ofMebiBytes(20), 0.5), new RowCompactedSerializer(RowType.of(new IntType())) .createSliceComparator()); diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index b9b5675f1d2d..cddef33c276e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -891,7 +891,7 @@ public class CoreOptions implements Serializable { public static final ConfigOption LOOKUP_LOCAL_FILE_TYPE = key("lookup.local-file-type") .enumType(LookupLocalFileType.class) - .defaultValue(LookupLocalFileType.HASH) + .defaultValue(LookupLocalFileType.SORT) .withDescription("The local file type for lookup."); public static final ConfigOption LOOKUP_HASH_LOAD_FACTOR = diff --git a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheBuilder.java b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheBuilder.java index 4660343d45e1..402f21f06264 100644 --- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheBuilder.java @@ -72,6 +72,10 @@ public Cache build() { org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder .newBuilder() .weigher(CacheBuilder::weigh) + // The concurrency level determines the number of segment caches in + // Guava,limiting the maximum block entries held in cache. Since we do + // not access this cache concurrently, it is set to 1. + .concurrencyLevel(1) .maximumWeight(memorySize.getBytes()) .removalListener(this::onRemoval) .build()); From 039046a0d4d4aa4195f9187b2d0214f277316ce8 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Mon, 2 Dec 2024 22:52:10 +0800 Subject: [PATCH 4/6] [core] Extract decompressBlock method in SortLookupStoreReader --- .../lookup/sort/SortLookupStoreReader.java | 65 +++++++++---------- 1 file changed, 32 insertions(+), 33 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java index 39997888ce92..6dbfe130e3bb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java @@ -106,7 +106,7 @@ public byte[] lookup(byte[] key) throws IOException { return null; } - private BlockIterator getNextBlock() throws IOException { + private BlockIterator getNextBlock() { // index block handle, point to the key, value position. MemorySlice blockHandle = indexBlockIterator.next().getValue(); BlockReader dataBlock = @@ -134,42 +134,41 @@ private BlockReader readBlock(BlockHandle blockHandle, boolean index) { blockCache.getBlock( blockHandle.offset(), blockHandle.size(), - bytes -> { - MemorySegment block = MemorySegment.wrap(bytes); - int crc32cCode = crc32c(block, blockTrailer.getCompressionType()); - checkArgument( - blockTrailer.getCrc32c() == crc32cCode, - String.format( - "Expected CRC32C(%d) but found CRC32C(%d) for file(%s)", - blockTrailer.getCrc32c(), crc32cCode, filePath)); - - // decompress data - BlockCompressionFactory compressionFactory = - BlockCompressionFactory.create( - blockTrailer.getCompressionType()); - if (compressionFactory == null) { - return bytes; - } else { - MemorySliceInput compressedInput = - MemorySlice.wrap(block).toInput(); - byte[] uncompressed = new byte[compressedInput.readVarLenInt()]; - BlockDecompressor decompressor = - compressionFactory.getDecompressor(); - int uncompressedLength = - decompressor.decompress( - block.getHeapMemory(), - compressedInput.position(), - compressedInput.available(), - uncompressed, - 0); - checkArgument(uncompressedLength == uncompressed.length); - return uncompressed; - } - }, + bytes -> decompressBlock(bytes, blockTrailer), index); return new BlockReader(MemorySlice.wrap(unCompressedBlock), comparator); } + private byte[] decompressBlock(byte[] compressedBytes, BlockTrailer blockTrailer) { + MemorySegment compressed = MemorySegment.wrap(compressedBytes); + int crc32cCode = crc32c(compressed, blockTrailer.getCompressionType()); + checkArgument( + blockTrailer.getCrc32c() == crc32cCode, + String.format( + "Expected CRC32C(%d) but found CRC32C(%d) for file(%s)", + blockTrailer.getCrc32c(), crc32cCode, filePath)); + + // decompress data + BlockCompressionFactory compressionFactory = + BlockCompressionFactory.create(blockTrailer.getCompressionType()); + if (compressionFactory == null) { + return compressedBytes; + } else { + MemorySliceInput compressedInput = MemorySlice.wrap(compressed).toInput(); + byte[] uncompressed = new byte[compressedInput.readVarLenInt()]; + BlockDecompressor decompressor = compressionFactory.getDecompressor(); + int uncompressedLength = + decompressor.decompress( + compressed.getHeapMemory(), + compressedInput.position(), + compressedInput.available(), + uncompressed, + 0); + checkArgument(uncompressedLength == uncompressed.length); + return uncompressed; + } + } + @Override public void close() throws IOException { if (bloomFilter != null) { From 300cc67c208c4b86e2edf58ad1981b86649fe892 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 3 Dec 2024 11:35:16 +0800 Subject: [PATCH 5/6] [spark] show table extended (#4603) * [spark] show table extended * update * [update] doc --- docs/content/spark/auxiliary.md | 11 ++ .../spark/PaimonPartitionManagement.scala | 2 +- .../analysis/PaimonResolvePartitionSpec.scala | 75 +++++++++++ .../PaimonSparkSessionExtensions.scala | 2 + .../org/apache/spark/sql/PaimonUtils.scala | 19 +++ .../catalog/PaimonCatalogImplicits.scala | 30 +++++ .../catalog/PaimonCatalogUtils.scala | 3 + .../spark/sql/paimon/shims/SparkShim.scala | 4 + .../paimon/spark/sql/DescribeTableTest.scala | 70 ++++++++++ .../analysis/Spark3ResolutionRules.scala | 56 ++++++++ .../PaimonShowTablePartitionCommand.scala | 96 ++++++++++++++ .../PaimonShowTablesExtendedCommand.scala | 123 ++++++++++++++++++ .../spark/sql/paimon/shims/Spark3Shim.scala | 8 +- .../analysis/Spark4ResolutionRules.scala | 27 ++++ .../spark/sql/paimon/shims/Spark4Shim.scala | 9 +- 15 files changed, 532 insertions(+), 3 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala create mode 100644 paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala create mode 100644 paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala create mode 100644 paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala create mode 100644 paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md index 6330ca27ce31..5de0289565f2 100644 --- a/docs/content/spark/auxiliary.md +++ b/docs/content/spark/auxiliary.md @@ -96,6 +96,17 @@ SHOW PARTITIONS my_table; SHOW PARTITIONS my_table PARTITION (dt=20230817); ``` +## Show Table Extended +The SHOW TABLE EXTENDED statement is used to list table or partition information. + +```sql +-- Lists tables that satisfy regular expressions +SHOW TABLE EXTENDED IN db_name LIKE 'test*'; + +-- Lists the specified partition information for the table +SHOW TABLE EXTENDED IN db_name LIKE 'table_name' PARTITION(pt = '2024'); +``` + ## Analyze table The ANALYZE TABLE statement collects statistics about the table, that are to be used by the query optimizer to find a better query execution plan. diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 9a305ca59a0f..840f1341a69d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -100,7 +100,7 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement { } override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] = { - throw new UnsupportedOperationException("Load partition is not supported") + Map.empty[String, String].asJava } override def listPartitionIdentifiers( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala new file mode 100644 index 000000000000..5d6a5a063c06 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.PaimonUtils.{normalizePartitionSpec, requireExactMatchedPartitionSpec} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec.conf +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} + +object PaimonResolvePartitionSpec { + + def resolve( + catalog: TableCatalog, + tableIndent: Identifier, + partitionSpec: PartitionSpec): ResolvedPartitionSpec = { + val table = catalog.loadTable(tableIndent).asPartitionable + partitionSpec match { + case u: UnresolvedPartitionSpec => + val partitionSchema = table.partitionSchema() + resolvePartitionSpec(table.name(), u, partitionSchema, allowPartitionSpec = false) + case o => o.asInstanceOf[ResolvedPartitionSpec] + } + } + + private def resolvePartitionSpec( + tableName: String, + partSpec: UnresolvedPartitionSpec, + partSchema: StructType, + allowPartitionSpec: Boolean): ResolvedPartitionSpec = { + val normalizedSpec = normalizePartitionSpec(partSpec.spec, partSchema, tableName, conf.resolver) + if (!allowPartitionSpec) { + requireExactMatchedPartitionSpec(tableName, normalizedSpec, partSchema.fieldNames) + } + val partitionNames = normalizedSpec.keySet + val requestedFields = partSchema.filter(field => partitionNames.contains(field.name)) + ResolvedPartitionSpec( + requestedFields.map(_.name), + convertToPartIdent(normalizedSpec, requestedFields), + partSpec.location) + } + + def convertToPartIdent( + partitionSpec: TablePartitionSpec, + schema: Seq[StructField]): InternalRow = { + val partValues = schema.map { + part => + val raw = partitionSpec.get(part.name).orNull + val dt = CharVarcharUtils.replaceCharVarcharWithString(part.dataType) + Cast(Literal.create(raw, StringType), dt, Some(conf.sessionLocalTimeZone)).eval() + } + InternalRow.fromSeq(partValues) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala index e8f75d394a81..f73df64fb8ab 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala @@ -40,6 +40,8 @@ class PaimonSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectResolutionRule(spark => new PaimonAnalysis(spark)) extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark)) extensions.injectResolutionRule(spark => PaimonViewResolver(spark)) + extensions.injectResolutionRule( + spark => SparkShimLoader.getSparkShim.createCustomResolution(spark)) extensions.injectResolutionRule(spark => PaimonIncompatibleResolutionRules(spark)) extensions.injectPostHocResolutionRule(spark => PaimonPostHocResolutionRules(spark)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala index 4492d856ad50..cc49e787dc81 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala @@ -20,11 +20,15 @@ package org.apache.spark.sql import org.apache.spark.executor.OutputMetrics import org.apache.spark.rdd.InputFileBlockHolder +import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.PartitioningUtils import org.apache.spark.util.{Utils => SparkUtils} /** @@ -87,4 +91,19 @@ object PaimonUtils { outputMetrics.setBytesWritten(bytesWritten) outputMetrics.setRecordsWritten(recordsWritten) } + + def normalizePartitionSpec[T]( + partitionSpec: Map[String, T], + partCols: StructType, + tblName: String, + resolver: Resolver): Map[String, T] = { + PartitioningUtils.normalizePartitionSpec(partitionSpec, partCols, tblName, resolver) + } + + def requireExactMatchedPartitionSpec( + tableName: String, + spec: TablePartitionSpec, + partitionColumnNames: Seq[String]): Unit = { + PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec, partitionColumnNames) + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala new file mode 100644 index 000000000000..f1f20fb6fb31 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +object PaimonCatalogImplicits { + + import CatalogV2Implicits._ + + implicit class PaimonCatalogHelper(plugin: CatalogPlugin) extends CatalogHelper(plugin) + + implicit class PaimonNamespaceHelper(namespace: Array[String]) extends NamespaceHelper(namespace) + +// implicit class PaimonTableHelper(table: Table) extends TableHelper(table) +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala index 2ab3dc494524..5db6894ba093 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.ExternalCatalog +import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.paimon.ReflectUtils @@ -40,4 +41,6 @@ object PaimonCatalogUtils { hadoopConf) } + val TABLE_RESERVED_PROPERTIES: Seq[String] = CatalogV2Util.TABLE_RESERVED_PROPERTIES + } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala index bd85282737e9..334bd6e93180 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala @@ -24,6 +24,8 @@ import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -39,6 +41,8 @@ trait SparkShim { def createSparkParser(delegate: ParserInterface): ParserInterface + def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] + def createSparkInternalRow(rowType: RowType): SparkInternalRow def createSparkArrayData(elementType: DataType): SparkArrayData diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala index 528dcd3cd107..ae538fa48c4e 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala @@ -27,6 +27,76 @@ import java.util.Objects class DescribeTableTest extends PaimonSparkTestBase { + test("Paimon show: show table extended") { + val testDB = "test_show" + withDatabase(testDB) { + spark.sql("CREATE TABLE s1 (id INT)") + + spark.sql(s"CREATE DATABASE $testDB") + spark.sql(s"USE $testDB") + spark.sql("CREATE TABLE s2 (id INT, pt STRING) PARTITIONED BY (pt)") + spark.sql("CREATE TABLE s3 (id INT, pt1 STRING, pt2 STRING) PARTITIONED BY (pt1, pt2)") + + spark.sql("INSERT INTO s2 VALUES (1, '2024'), (2, '2024'), (3, '2025'), (4, '2026')") + spark.sql(""" + |INSERT INTO s3 + |VALUES + |(1, '2024', '11'), (2, '2024', '12'), (3, '2025', '11'), (4, '2025', '12') + |""".stripMargin) + + // SHOW TABL EXTENDED will give four columns: namespace, tableName, isTemporary, information. + checkAnswer( + sql(s"SHOW TABLE EXTENDED IN $dbName0 LIKE '*'") + .select("namespace", "tableName", "isTemporary"), + Row("test", "s1", false)) + checkAnswer( + sql(s"SHOW TABLE EXTENDED IN $testDB LIKE '*'") + .select("namespace", "tableName", "isTemporary"), + Row(testDB, "s2", false) :: Row(testDB, "s3", false) :: Nil + ) + + // check table s1 + val res1 = spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2'").select("information") + Assertions.assertEquals(1, res1.count()) + val information1 = res1 + .collect() + .head + .getString(0) + .split("\n") + .map { + line => + val kv = line.split(": ", 2) + kv(0) -> kv(1) + } + .toMap + Assertions.assertEquals(information1("Catalog"), "paimon") + Assertions.assertEquals(information1("Namespace"), testDB) + Assertions.assertEquals(information1("Table"), "s2") + Assertions.assertEquals(information1("Provider"), "paimon") + Assertions.assertEquals(information1("Location"), loadTable(testDB, "s2").location().toString) + + // check table s2 partition info + val error1 = intercept[Exception] { + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt='2022')") + }.getMessage + assert(error1.contains("PARTITIONS_NOT_FOUND")) + + val error2 = intercept[Exception] { + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3' PARTITION(pt1='2024')") + }.getMessage + assert(error2.contains("Partition spec is invalid")) + + val res2 = + spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3' PARTITION(pt1 = '2024', pt2 = 11)") + checkAnswer( + res2.select("namespace", "tableName", "isTemporary"), + Row(testDB, "s3", false) + ) + Assertions.assertTrue( + res2.select("information").collect().head.getString(0).contains("Partition Values")) + } + } + test(s"Paimon describe: describe table comment") { var comment = "test comment" spark.sql(s""" diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala new file mode 100644 index 000000000000..924df2d1e320 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.paimon.spark.commands.{PaimonShowTablePartitionCommand, PaimonShowTablesExtendedCommand} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedNamespace, UnresolvedPartitionSpec} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTableExtended} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.Identifier + +case class Spark3ResolutionRules(session: SparkSession) + extends Rule[LogicalPlan] + with SQLConfHelper { + + import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { + case ShowTableExtended( + ResolvedNamespace(catalog, ns), + pattern, + partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))), + output) => + partitionSpec + .map { + spec: PartitionSpec => + val table = Identifier.of(ns.toArray, pattern) + val resolvedSpec = + PaimonResolvePartitionSpec.resolve(catalog.asTableCatalog, table, spec) + PaimonShowTablePartitionCommand(output, catalog.asTableCatalog, table, resolvedSpec) + } + .getOrElse { + PaimonShowTablesExtendedCommand(catalog.asTableCatalog, ns, pattern, output) + } + + } + +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala new file mode 100644 index 000000000000..32f94985859c --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName +import org.apache.spark.sql.catalyst.expressions.{Attribute, ToPrettyString} +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog} +import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +case class PaimonShowTablePartitionCommand( + override val output: Seq[Attribute], + catalog: TableCatalog, + tableIndent: Identifier, + partSpec: ResolvedPartitionSpec) + extends PaimonLeafRunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { + val rows = new mutable.ArrayBuffer[Row]() + val table = catalog.loadTable(tableIndent) + val information = getTablePartitionDetails(tableIndent, table.asPartitionable, partSpec) + rows += Row(tableIndent.namespace.quoted, tableIndent.name(), false, s"$information\n") + + rows.toSeq + } + + private def getTablePartitionDetails( + tableIdent: Identifier, + partitionTable: SupportsPartitionManagement, + partSpec: ResolvedPartitionSpec): String = { + val results = new mutable.LinkedHashMap[String, String]() + + // "Partition Values" + val partitionSchema = partitionTable.partitionSchema() + val (names, ident) = (partSpec.names, partSpec.ident) + val partitionIdentifiers = partitionTable.listPartitionIdentifiers(names.toArray, ident) + if (partitionIdentifiers.isEmpty) { + val part = ident + .toSeq(partitionSchema) + .zip(partitionSchema.map(_.name)) + .map(kv => s"${kv._2}" + s" = ${kv._1}") + .mkString(", ") + throw new RuntimeException( + s""" + |[PARTITIONS_NOT_FOUND] The partition(s) PARTITION ($part) cannot be found in table ${tableIdent.toString}. + |Verify the partition specification and table name. + |""".stripMargin) + } + assert(partitionIdentifiers.length == 1) + val row = partitionIdentifiers.head + val len = partitionSchema.length + val partitions = new Array[String](len) + val timeZoneId = conf.sessionLocalTimeZone + for (i <- 0 until len) { + val dataType = partitionSchema(i).dataType + val partValueUTF8String = + ToPrettyString(Literal(row.get(i, dataType), dataType), Some(timeZoneId)).eval(null) + val partValueStr = if (partValueUTF8String == null) "null" else partValueUTF8String.toString + partitions(i) = escapePathName(partitionSchema(i).name) + "=" + escapePathName(partValueStr) + } + val partitionValues = partitions.mkString("[", ", ", "]") + results.put("Partition Values", s"$partitionValues") + + // TODO "Partition Parameters", "Created Time", "Last Access", "Partition Statistics" + + results + .map { + case (key, value) => + if (value.isEmpty) key else s"$key: $value" + } + .mkString("", "\n", "") + } +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala new file mode 100644 index 000000000000..b393982e25d3 --- /dev/null +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.commands + +import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.{QuotingUtils, StringUtils} +import org.apache.spark.sql.connector.catalog.{Identifier, PaimonCatalogUtils, SupportsPartitionManagement, Table, TableCatalog} +import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._ +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +case class PaimonShowTablesExtendedCommand( + catalog: TableCatalog, + namespace: Seq[String], + pattern: String, + override val output: Seq[Attribute], + isExtended: Boolean = false, + partitionSpec: Option[TablePartitionSpec] = None) + extends PaimonLeafRunnableCommand { + + override def run(spark: SparkSession): Seq[Row] = { + val rows = new mutable.ArrayBuffer[Row]() + + val tables = catalog.listTables(namespace.toArray) + tables.map { + tableIdent: Identifier => + if (StringUtils.filterPattern(Seq(tableIdent.name()), pattern).nonEmpty) { + val table = catalog.loadTable(tableIdent) + val information = getTableDetails(catalog.name, tableIdent, table) + rows += Row(tableIdent.namespace().quoted, tableIdent.name(), false, s"$information\n") + } + } + + // TODO: view + + rows.toSeq + } + + private def getTableDetails(catalogName: String, identifier: Identifier, table: Table): String = { + val results = new mutable.LinkedHashMap[String, String]() + + results.put("Catalog", catalogName) + results.put("Namespace", identifier.namespace().quoted) + results.put("Table", identifier.name()) + val tableType = if (table.properties().containsKey(TableCatalog.PROP_EXTERNAL)) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + results.put("Type", tableType.name) + + PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES + .filterNot(_ == TableCatalog.PROP_EXTERNAL) + .foreach( + propKey => { + if (table.properties.containsKey(propKey)) { + results.put(propKey.capitalize, table.properties.get(propKey)) + } + }) + + val properties: Seq[String] = + conf + .redactOptions(table.properties.asScala.toMap) + .toList + .filter(kv => !PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES.contains(kv._1)) + .sortBy(_._1) + .map { case (key, value) => key + "=" + value } + if (!table.properties().isEmpty) { + results.put("Table Properties", properties.mkString("[", ", ", "]")) + } + + // Partition Provider & Partition Columns + if (supportsPartitions(table) && table.asPartitionable.partitionSchema().nonEmpty) { + results.put("Partition Provider", "Catalog") + results.put( + "Partition Columns", + table.asPartitionable + .partitionSchema() + .map(field => QuotingUtils.quoteIdentifier(field.name)) + .mkString("[", ", ", "]")) + } + + if (table.schema().nonEmpty) { + results.put("Schema", table.schema().treeString) + } + + results + .map { + case (key, value) => + if (value.isEmpty) key else s"$key: $value" + } + .mkString("", "\n", "") + } + + private def supportsPartitions(table: Table): Boolean = table match { + case _: SupportsPartitionManagement => true + case _ => false + } + +} diff --git a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala index 57d79d6474e9..f508e2605cbc 100644 --- a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala +++ b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow, SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} @@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -38,6 +40,10 @@ class Spark3Shim extends SparkShim { new PaimonSpark3SqlExtensionsParser(delegate) } + override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] = { + Spark3ResolutionRules(spark) + } + override def createSparkInternalRow(rowType: RowType): SparkInternalRow = { new Spark3InternalRow(rowType) } diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala new file mode 100644 index 000000000000..461cbd0c938a --- /dev/null +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.analysis + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +case class Spark4ResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan +} diff --git a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala index dfec4eb71f4f..eefddafdbfb8 100644 --- a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala +++ b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.paimon.shims +import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules import org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow, SparkArrayData, SparkInternalRow} import org.apache.paimon.types.{DataType, RowType} @@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType} import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.Aggregate +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.internal.ExpressionUtils @@ -38,6 +40,11 @@ class Spark4Shim extends SparkShim { override def createSparkParser(delegate: ParserInterface): ParserInterface = { new PaimonSpark4SqlExtensionsParser(delegate) } + + override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan] = { + Spark4ResolutionRules(spark) + } + override def createSparkInternalRow(rowType: RowType): SparkInternalRow = { new Spark4InternalRow(rowType) } From d33b8711fc6b4e1f35ba7d85336be4ff3baa956d Mon Sep 17 00:00:00 2001 From: "aiden.dong" <782112163@qq.com> Date: Tue, 3 Dec 2024 13:33:44 +0800 Subject: [PATCH 6/6] [core] Optimization of Parquet Predicate Pushdown Capability (#4608) --- .../table/PrimaryKeyFileStoreTableTest.java | 63 ++++++ .../format/parquet/ParquetReaderFactory.java | 66 +++++- .../parquet/reader/AbstractColumnReader.java | 204 +++++++++++++----- .../parquet/reader/BooleanColumnReader.java | 36 +++- .../parquet/reader/ByteColumnReader.java | 39 +++- .../parquet/reader/BytesColumnReader.java | 41 +++- .../parquet/reader/DoubleColumnReader.java | 38 +++- .../reader/FixedLenBytesColumnReader.java | 36 +++- .../parquet/reader/FloatColumnReader.java | 38 +++- .../parquet/reader/IntColumnReader.java | 39 +++- .../parquet/reader/LongColumnReader.java | 39 +++- .../parquet/reader/NestedColumnReader.java | 2 +- .../reader/NestedPrimitiveColumnReader.java | 141 +++++++----- .../parquet/reader/ParquetReadState.java | 148 +++++++++++++ .../reader/ParquetSplitReaderUtil.java | 41 ++-- .../parquet/reader/RunLengthDecoder.java | 45 ++++ .../parquet/reader/ShortColumnReader.java | 38 +++- .../parquet/reader/TimestampColumnReader.java | 15 +- 18 files changed, 898 insertions(+), 171 deletions(-) create mode 100644 paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 46b85223bc2f..e80b49a0f05d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -84,6 +84,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -809,6 +810,68 @@ public void testDeletionVectorsWithFileIndexInFile() throws Exception { "1|4|500|binary|varbinary|mapKey:mapVal|multiset")); } + @Test + public void testDeletionVectorsWithParquetFilter() throws Exception { + FileStoreTable table = + createFileStoreTable( + conf -> { + conf.set(BUCKET, 1); + conf.set(DELETION_VECTORS_ENABLED, true); + conf.set(FILE_FORMAT, "parquet"); + conf.set("parquet.block.size", "1048576"); + conf.set("parquet.page.size", "1024"); + }); + + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + + BatchTableWrite write = + (BatchTableWrite) + writeBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + + for (int i = 0; i < 200000; i++) { + write.write(rowData(1, i, i * 100L)); + } + + List messages = write.prepareCommit(); + BatchTableCommit commit = writeBuilder.newCommit(); + commit.commit(messages); + write = + (BatchTableWrite) + writeBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + for (int i = 180000; i < 200000; i++) { + write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L)); + } + + messages = write.prepareCommit(); + commit = writeBuilder.newCommit(); + commit.commit(messages); + + PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); + List splits = toSplits(table.newSnapshotReader().read().dataSplits()); + Random random = new Random(); + + for (int i = 0; i < 10; i++) { + int value = random.nextInt(180000); + TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); + assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) + .isEqualTo( + Arrays.asList( + String.format( + "%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset", + 1, value, value * 100L))); + } + + for (int i = 0; i < 10; i++) { + int value = 180000 + random.nextInt(20000); + TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); + assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty(); + } + } + @Test public void testDeletionVectorsWithFileIndexInMeta() throws Exception { FileStoreTable table = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index f0151d6f3d8f..0c996531201a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.parquet.reader.ColumnReader; import org.apache.paimon.format.parquet.reader.ParquetDecimalVector; +import org.apache.paimon.format.parquet.reader.ParquetReadState; import org.apache.paimon.format.parquet.reader.ParquetTimestampVector; import org.apache.paimon.format.parquet.type.ParquetField; import org.apache.paimon.fs.Path; @@ -130,7 +131,7 @@ public FileRecordReader createReader(FormatReaderFactory.Context co buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO); return new ParquetReader( - reader, requestedSchema, reader.getRecordCount(), poolOfBatches, fields); + reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields); } private void setReadOptions(ParquetReadOptions.Builder builder) { @@ -336,6 +337,10 @@ private class ParquetReader implements FileRecordReader { private long nextRowPosition; + private ParquetReadState currentRowGroupReadState; + + private long currentRowGroupFirstRowIndex; + /** * For each request column, the reader to read this column. This is NULL if this column is * missing from the file, in which case we populate the attribute with NULL. @@ -359,6 +364,7 @@ private ParquetReader( this.totalCountLoadedSoFar = 0; this.currentRowPosition = 0; this.nextRowPosition = 0; + this.currentRowGroupFirstRowIndex = 0; this.fields = fields; } @@ -390,7 +396,8 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException { currentRowPosition = nextRowPosition; } - int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned); + int num = getBachSize(); + for (int i = 0; i < columnReaders.length; ++i) { if (columnReaders[i] == null) { batch.writableVectors[i].fillWithNulls(); @@ -400,13 +407,13 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException { } } rowsReturned += num; - nextRowPosition = currentRowPosition + num; + nextRowPosition = getNextRowPosition(num); batch.columnarBatch.setNumRows(num); return true; } private void readNextRowGroup() throws IOException { - PageReadStore rowGroup = reader.readNextRowGroup(); + PageReadStore rowGroup = reader.readNextFilteredRowGroup(); if (rowGroup == null) { throw new IOException( "expecting more rows but reached last block. Read " @@ -415,6 +422,9 @@ private void readNextRowGroup() throws IOException { + totalRowCount); } + this.currentRowGroupReadState = + new ParquetReadState(rowGroup.getRowIndexes().orElse(null)); + List types = requestedSchema.getFields(); columnReaders = new ColumnReader[types.size()]; for (int i = 0; i < types.size(); ++i) { @@ -429,18 +439,62 @@ private void readNextRowGroup() throws IOException { 0); } } + totalCountLoadedSoFar += rowGroup.getRowCount(); - if (rowGroup.getRowIndexOffset().isPresent()) { - currentRowPosition = rowGroup.getRowIndexOffset().get(); + + if (rowGroup.getRowIndexOffset().isPresent()) { // filter + currentRowGroupFirstRowIndex = rowGroup.getRowIndexOffset().get(); + long pageIndex = 0; + if (!this.currentRowGroupReadState.isMaxRange()) { + pageIndex = this.currentRowGroupReadState.currentRangeStart(); + } + currentRowPosition = currentRowGroupFirstRowIndex + pageIndex; } else { if (reader.rowGroupsFiltered()) { throw new RuntimeException( "There is a bug, rowIndexOffset must be present when row groups are filtered."); } + currentRowGroupFirstRowIndex = nextRowPosition; currentRowPosition = nextRowPosition; } } + private int getBachSize() throws IOException { + + long rangeBatchSize = Long.MAX_VALUE; + if (this.currentRowGroupReadState.isFinished()) { + throw new IOException( + "expecting more rows but reached last page block. Read " + + rowsReturned + + " out of " + + totalRowCount); + } else if (!this.currentRowGroupReadState.isMaxRange()) { + long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex; + rangeBatchSize = this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1; + } + + return (int) + Math.min( + batchSize, + Math.min(rangeBatchSize, totalCountLoadedSoFar - rowsReturned)); + } + + private long getNextRowPosition(int num) { + if (this.currentRowGroupReadState.isMaxRange()) { + return this.currentRowPosition + num; + } else { + long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex; + long nextIndex = pageIndex + num; + + if (this.currentRowGroupReadState.currentRangeEnd() < nextIndex) { + this.currentRowGroupReadState.nextRange(); + nextIndex = this.currentRowGroupReadState.currentRangeStart(); + } + + return nextIndex; + } + } + private ParquetReaderBatch getCachedEntry() throws IOException { try { return pool.pollEntry(); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java index 7e2ab6d5e7f0..5e3f4a7e6a33 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java @@ -32,6 +32,7 @@ import org.apache.parquet.column.page.DataPageV1; import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; @@ -65,20 +66,16 @@ public abstract class AbstractColumnReader protected final ColumnDescriptor descriptor; - /** Total number of values read. */ - private long valuesRead; - - /** - * value that indicates the end of the current page. That is, if valuesRead == - * endOfPageValueCount, we are at the end of the page. - */ - private long endOfPageValueCount; - /** If true, the current page is dictionary encoded. */ private boolean isCurrentPageDictionaryEncoded; /** Total values in the current page. */ - private int pageValueCount; + // private int pageValueCount; + + /** + * Helper struct to track intermediate states while reading Parquet pages in the column chunk. + */ + private final ParquetReadState readState; /* * Input streams: @@ -101,12 +98,14 @@ public abstract class AbstractColumnReader /** Dictionary decoder to wrap dictionary ids input stream. */ private RunLengthDecoder dictionaryIdsDecoder; - public AbstractColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public AbstractColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { this.descriptor = descriptor; - this.pageReader = pageReader; + this.pageReader = pageReadStore.getPageReader(descriptor); this.maxDefLevel = descriptor.getMaxDefinitionLevel(); + this.readState = new ParquetReadState(pageReadStore.getRowIndexes().orElse(null)); + DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); if (dictionaryPage != null) { try { @@ -147,56 +146,136 @@ public final void readToVector(int readNumber, VECTOR vector) throws IOException if (dictionary != null) { dictionaryIds = vector.reserveDictionaryIds(readNumber); } - while (readNumber > 0) { + + readState.resetForNewBatch(readNumber); + + while (readState.rowsToReadInBatch > 0) { // Compute the number of values we want to read in this page. - int leftInPage = (int) (endOfPageValueCount - valuesRead); - if (leftInPage == 0) { - DataPage page = pageReader.readPage(); - if (page instanceof DataPageV1) { - readPageV1((DataPageV1) page); - } else if (page instanceof DataPageV2) { - readPageV2((DataPageV2) page); - } else { - throw new RuntimeException("Unsupported page type: " + page.getClass()); + if (readState.valuesToReadInPage == 0) { + int pageValueCount = readPage(); + if (pageValueCount < 0) { + // we've read all the pages; this could happen when we're reading a repeated + // list and we + // don't know where the list will end until we've seen all the pages. + break; } - leftInPage = (int) (endOfPageValueCount - valuesRead); } - int num = Math.min(readNumber, leftInPage); - if (isCurrentPageDictionaryEncoded) { - // Read and decode dictionary ids. - runLenDecoder.readDictionaryIds( - num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder); - - if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) { - // Column vector supports lazy decoding of dictionary values so just set the - // dictionary. - // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. - // some - // non-dictionary encoded values have already been added). - vector.setDictionary(new ParquetDictionary(dictionary)); + + if (readState.isFinished()) { + break; + } + + long pageRowId = readState.rowId; + int leftInBatch = readState.rowsToReadInBatch; + int leftInPage = readState.valuesToReadInPage; + + int readBatch = Math.min(leftInBatch, leftInPage); + + long rangeStart = readState.currentRangeStart(); + long rangeEnd = readState.currentRangeEnd(); + + if (pageRowId < rangeStart) { + int toSkip = (int) (rangeStart - pageRowId); + if (toSkip >= leftInPage) { // drop page + pageRowId += leftInPage; + leftInPage = 0; } else { - readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds); + if (isCurrentPageDictionaryEncoded) { + runLenDecoder.skipDictionaryIds( + toSkip, maxDefLevel, this.dictionaryIdsDecoder); + pageRowId += toSkip; + leftInPage -= toSkip; + } else { + skipBatch(toSkip); + pageRowId += toSkip; + leftInPage -= toSkip; + } } + } else if (pageRowId > rangeEnd) { + readState.nextRange(); } else { - if (vector.hasDictionary() && rowId != 0) { - // This batch already has dictionary encoded values but this new page is not. - // The batch - // does not support a mix of dictionary and not so we will decode the - // dictionary. - readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds()); + long start = pageRowId; + long end = Math.min(rangeEnd, pageRowId + readBatch - 1); + int num = (int) (end - start + 1); + + if (isCurrentPageDictionaryEncoded) { + // Read and decode dictionary ids. + runLenDecoder.readDictionaryIds( + num, + dictionaryIds, + vector, + rowId, + maxDefLevel, + this.dictionaryIdsDecoder); + + if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) { + // Column vector supports lazy decoding of dictionary values so just set the + // dictionary. + // We can't do this if rowId != 0 AND the column doesn't have a dictionary + // (i.e. + // some + // non-dictionary encoded values have already been added). + vector.setDictionary(new ParquetDictionary(dictionary)); + } else { + readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds); + } + } else { + if (vector.hasDictionary() && rowId != 0) { + // This batch already has dictionary encoded values but this new page is + // not. + // The batch + // does not support a mix of dictionary and not so we will decode the + // dictionary. + readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds()); + } + vector.setDictionary(null); + readBatch(rowId, num, vector); } - vector.setDictionary(null); - readBatch(rowId, num, vector); + leftInBatch -= num; + pageRowId += num; + leftInPage -= num; + rowId += num; } + readState.rowsToReadInBatch = leftInBatch; + readState.valuesToReadInPage = leftInPage; + readState.rowId = pageRowId; + } + } - valuesRead += num; - rowId += num; - readNumber -= num; + private int readPage() { + DataPage page = pageReader.readPage(); + if (page == null) { + return -1; } + long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); + + int pageValueCount = + page.accept( + new DataPage.Visitor() { + @Override + public Integer visit(DataPageV1 dataPageV1) { + try { + return readPageV1(dataPageV1); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Integer visit(DataPageV2 dataPageV2) { + try { + return readPageV2(dataPageV2); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + readState.resetForNewPage(pageValueCount, pageFirstRowIndex); + return pageValueCount; } - private void readPageV1(DataPageV1 page) throws IOException { - this.pageValueCount = page.getValueCount(); + private int readPageV1(DataPageV1 page) throws IOException { + int pageValueCount = page.getValueCount(); ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); // Initialize the decoders. @@ -211,30 +290,31 @@ private void readPageV1(DataPageV1 page) throws IOException { ByteBufferInputStream in = bytes.toInputStream(); rlReader.initFromPage(pageValueCount, in); this.runLenDecoder.initFromStream(pageValueCount, in); - prepareNewPage(page.getValueEncoding(), in); + prepareNewPage(page.getValueEncoding(), in, pageValueCount); + return pageValueCount; } catch (IOException e) { throw new IOException("could not read page " + page + " in col " + descriptor, e); } } - private void readPageV2(DataPageV2 page) throws IOException { - this.pageValueCount = page.getValueCount(); + private int readPageV2(DataPageV2 page) throws IOException { + int pageValueCount = page.getValueCount(); int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel()); // do not read the length from the stream. v2 pages handle dividing the page bytes. this.runLenDecoder = new RunLengthDecoder(bitWidth, false); this.runLenDecoder.initFromStream( - this.pageValueCount, page.getDefinitionLevels().toInputStream()); + pageValueCount, page.getDefinitionLevels().toInputStream()); try { - prepareNewPage(page.getDataEncoding(), page.getData().toInputStream()); + prepareNewPage(page.getDataEncoding(), page.getData().toInputStream(), pageValueCount); + return pageValueCount; } catch (IOException e) { throw new IOException("could not read page " + page + " in col " + descriptor, e); } } - private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in) + private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in, int pageValueCount) throws IOException { - this.endOfPageValueCount = valuesRead + pageValueCount; if (dataEncoding.usesDictionary()) { if (dictionary == null) { throw new IOException( @@ -269,6 +349,14 @@ private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in) afterReadPage(); } + final void skipDataBuffer(int length) { + try { + dataInputStream.skipFully(length); + } catch (IOException e) { + throw new ParquetDecodingException("Failed to skip " + length + " bytes", e); + } + } + final ByteBuffer readDataBuffer(int length) { try { return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN); @@ -291,6 +379,8 @@ protected boolean supportLazyDecode() { /** Read batch from {@link #runLenDecoder} and {@link #dataInputStream}. */ protected abstract void readBatch(int rowId, int num, VECTOR column); + protected abstract void skipBatch(int num); + /** * Decode dictionary ids to data. From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}. */ diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java index d5dc231d8436..83d3c5a07d4b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.PrimitiveType; @@ -36,9 +36,9 @@ public class BooleanColumnReader extends AbstractColumnReader 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + for (int i = 0; i < n; i++) { + readBoolean(); + } + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + readBoolean(); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + private boolean readBoolean() { if (bitOffset == 0) { try { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java index bed9923d9be3..804b8bc0275e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,8 +31,9 @@ /** Byte {@link ColumnReader}. Using INT32 to store byte, so just cast int to byte. */ public class ByteColumnReader extends AbstractColumnReader { - public ByteColumnReader(ColumnDescriptor descriptor, PageReader pageReader) throws IOException { - super(descriptor, pageReader); + public ByteColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT32); } @@ -69,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableByteVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipByte(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipByte(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipByte(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableByteVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java index e83115c8a69f..6ee395e58568 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,9 +31,9 @@ /** Bytes {@link ColumnReader}. A int length and bytes data. */ public class BytesColumnReader extends AbstractColumnReader { - public BytesColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public BytesColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.BINARY); } @@ -70,6 +70,41 @@ protected void readBatch(int rowId, int num, WritableBytesVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipBinary(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipBinary(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipBinary(int num) { + for (int i = 0; i < num; i++) { + int len = readDataBuffer(4).getInt(); + skipDataBuffer(len); + } + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableBytesVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java index d6d8aa2bbb22..2cffd406248e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,9 +31,9 @@ /** Double {@link ColumnReader}. */ public class DoubleColumnReader extends AbstractColumnReader { - public DoubleColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public DoubleColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.DOUBLE); } @@ -70,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableDoubleVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipDouble(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipDouble(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipDouble(int num) { + skipDataBuffer(8 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableDoubleVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java index afce717a6719..25e1b466e465 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java @@ -25,7 +25,7 @@ import org.apache.paimon.format.parquet.ParquetSchemaConverter; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType; @@ -39,8 +39,9 @@ public class FixedLenBytesColumnReader private final int precision; public FixedLenBytesColumnReader( - ColumnDescriptor descriptor, PageReader pageReader, int precision) throws IOException { - super(descriptor, pageReader); + ColumnDescriptor descriptor, PageReadStore pageReadStore, int precision) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY); this.precision = precision; } @@ -79,6 +80,35 @@ protected void readBatch(int rowId, int num, VECTOR column) { } } + @Override + protected void skipBatch(int num) { + int bytesLen = descriptor.getPrimitiveType().getTypeLength(); + if (ParquetSchemaConverter.is32BitDecimal(precision)) { + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } else if (ParquetSchemaConverter.is64BitDecimal(precision)) { + + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } else { + for (int i = 0; i < num; i++) { + if (runLenDecoder.readInteger() == maxDefLevel) { + skipDataBinary(bytesLen); + } + } + } + } + + private void skipDataBinary(int len) { + skipDataBuffer(len); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, VECTOR column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java index 1f4adfa4b9c8..e9eec13df5fc 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,9 +31,9 @@ /** Float {@link ColumnReader}. */ public class FloatColumnReader extends AbstractColumnReader { - public FloatColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public FloatColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.FLOAT); } @@ -70,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableFloatVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipFloat(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipFloat(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipFloat(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableFloatVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java index e38e916d187e..521ad998f6f1 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java @@ -21,7 +21,7 @@ import org.apache.paimon.data.columnar.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -30,8 +30,9 @@ /** Int {@link ColumnReader}. */ public class IntColumnReader extends AbstractColumnReader { - public IntColumnReader(ColumnDescriptor descriptor, PageReader pageReader) throws IOException { - super(descriptor, pageReader); + public IntColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT32); } @@ -68,6 +69,38 @@ protected void readBatch(int rowId, int num, WritableIntVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipInteger(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipInteger(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipInteger(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableIntVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java index a8e04eae673a..c4af086a7026 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableLongVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -31,8 +31,9 @@ /** Long {@link ColumnReader}. */ public class LongColumnReader extends AbstractColumnReader { - public LongColumnReader(ColumnDescriptor descriptor, PageReader pageReader) throws IOException { - super(descriptor, pageReader); + public LongColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) + throws IOException { + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT64); } @@ -69,6 +70,38 @@ protected void readBatch(int rowId, int num, WritableLongVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipValue(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipValue(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipValue(int num) { + skipDataBuffer(num * 8); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableLongVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java index 68225fbd1320..8f20be275447 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java @@ -279,7 +279,7 @@ private Pair readPrimitive( reader = new NestedPrimitiveColumnReader( descriptor, - pages.getPageReader(descriptor), + pages, isUtcTimestamp, descriptor.getPrimitiveType(), field.getType(), diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java index 7d00ff79234a..7db7aedbf6ae 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java @@ -44,6 +44,7 @@ import org.apache.parquet.column.page.DataPageV1; import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; @@ -82,15 +83,6 @@ public class NestedPrimitiveColumnReader implements ColumnReader valueList = new ArrayList<>(); + int valueIndex = collectDataFromParquetPage(readNumber, valueList); + + return fillColumnVector(valueIndex, valueList); + } + + private int collectDataFromParquetPage(int total, List valueList) throws IOException { + int valueIndex = 0; // repeated type need two loops to read data. - while (!eof && index < readNumber) { + + readState.resetForNewBatch(total); + + while (!eof && readState.rowsToReadInBatch > 0) { + + if (readState.isFinished()) { // finished to read + eof = true; + break; + } + + long pageRowId = readState.rowId; + long rangeStart = readState.currentRangeStart(); + long rangeEnd = readState.currentRangeEnd(); + + if (pageRowId > rangeEnd) { + readState.nextRange(); + continue; + } + + boolean needFilterSkip = pageRowId < rangeStart; + do { - if (!lastValue.shouldSkip) { + + if (!lastValue.shouldSkip && !needFilterSkip) { valueList.add(lastValue.value); valueIndex++; } } while (readValue() && (repetitionLevel != 0)); - index++; + + if (pageRowId == readState.rowId) { + readState.rowId = readState.rowId + 1; + } + + if (!needFilterSkip) { + readState.rowsToReadInBatch = readState.rowsToReadInBatch - 1; + } } - return fillColumnVector(valueIndex, valueList); + return valueIndex; } public LevelDelegation getLevelDelegation() { @@ -255,20 +287,24 @@ private void readAndSaveRepetitionAndDefinitionLevels() { // get the values of repetition and definitionLevel repetitionLevel = repetitionLevelColumn.nextInt(); definitionLevel = definitionLevelColumn.nextInt(); - valuesRead++; + readState.valuesToReadInPage = readState.valuesToReadInPage - 1; repetitionLevelList.add(repetitionLevel); definitionLevelList.add(definitionLevel); } private int readPageIfNeed() throws IOException { // Compute the number of values we want to read in this page. - int leftInPage = (int) (endOfPageValueCount - valuesRead); - if (leftInPage == 0) { - // no data left in current page, load data from new page - readPage(); - leftInPage = (int) (endOfPageValueCount - valuesRead); + if (readState.valuesToReadInPage == 0) { + int pageValueCount = readPage(); + // 返回当前 page 的数据量 + if (pageValueCount < 0) { + // we've read all the pages; this could happen when we're reading a repeated list + // and we + // don't know where the list will end until we've seen all the pages. + return -1; + } } - return leftInPage; + return readState.valuesToReadInPage; } private Object readPrimitiveTypedRow(DataType category) { @@ -528,33 +564,36 @@ private static HeapBytesVector getHeapBytesVector(int total, List valueList) { return phbv; } - protected void readPage() { + protected int readPage() { DataPage page = pageReader.readPage(); if (page == null) { - return; + return -1; } - page.accept( - new DataPage.Visitor() { - @Override - public Void visit(DataPageV1 dataPageV1) { - readPageV1(dataPageV1); - return null; - } + long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); - @Override - public Void visit(DataPageV2 dataPageV2) { - readPageV2(dataPageV2); - return null; - } - }); + int pageValueCount = + page.accept( + new DataPage.Visitor() { + @Override + public Integer visit(DataPageV1 dataPageV1) { + return readPageV1(dataPageV1); + } + + @Override + public Integer visit(DataPageV2 dataPageV2) { + return readPageV2(dataPageV2); + } + }); + readState.resetForNewPage(pageValueCount, pageFirstRowIndex); + return pageValueCount; } private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) throws IOException { - this.pageValueCount = valueCount; - this.endOfPageValueCount = valuesRead + pageValueCount; + // this.pageValueCount = valueCount; + // this.endOfPageValueCount = valuesRead + pageValueCount; if (dataEncoding.usesDictionary()) { this.dataColumn = null; if (dictionary == null) { @@ -577,13 +616,14 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int } try { - dataColumn.initFromPage(pageValueCount, in); + dataColumn.initFromPage(valueCount, in); } catch (IOException e) { throw new IOException(String.format("Could not read page in col %s.", descriptor), e); } } - private void readPageV1(DataPageV1 page) { + private int readPageV1(DataPageV1 page) { + int pageValueCount = page.getValueCount(); ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL); ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL); this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader); @@ -597,15 +637,16 @@ private void readPageV1(DataPageV1 page) { LOG.debug("Reading definition levels at {}.", in.position()); dlReader.initFromPage(pageValueCount, in); LOG.debug("Reading data at {}.", in.position()); - initDataReader(page.getValueEncoding(), in, page.getValueCount()); + initDataReader(page.getValueEncoding(), in, pageValueCount); + return pageValueCount; } catch (IOException e) { throw new ParquetDecodingException( String.format("Could not read page %s in col %s.", page, descriptor), e); } } - private void readPageV2(DataPageV2 page) { - this.pageValueCount = page.getValueCount(); + private int readPageV2(DataPageV2 page) { + int pageValueCount = page.getValueCount(); this.repetitionLevelColumn = newRLEIterator(descriptor.getMaxRepetitionLevel(), page.getRepetitionLevels()); this.definitionLevelColumn = @@ -615,8 +656,8 @@ private void readPageV2(DataPageV2 page) { "Page data size {} bytes and {} records.", page.getData().size(), pageValueCount); - initDataReader( - page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount()); + initDataReader(page.getDataEncoding(), page.getData().toInputStream(), pageValueCount); + return pageValueCount; } catch (IOException e) { throw new ParquetDecodingException( String.format("Could not read page %s in col %s.", page, descriptor), e); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java new file mode 100644 index 000000000000..a6003676825a --- /dev/null +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet.reader; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + +/** Parquet reader state for column index. */ +public class ParquetReadState { + /** A special row range used when there is no row indexes (hence all rows must be included). */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + + /** + * A special row range used when the row indexes are present AND all the row ranges have been + * processed. This serves as a sentinel at the end indicating that all rows come after the last + * row range should be skipped. + */ + private static final RowRange END_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + private final Iterator rowRanges; + + private RowRange currentRange; + + /** row index for the next read. */ + long rowId; + + int valuesToReadInPage; + int rowsToReadInBatch; + + public ParquetReadState(PrimitiveIterator.OfLong rowIndexes) { + this.rowRanges = constructRanges(rowIndexes); + nextRange(); + } + + /** + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: `[0-2], + * [4-5], [7-9]`. + */ + private Iterator constructRanges(PrimitiveIterator.OfLong rowIndexes) { + if (rowIndexes == null) { + return null; + } + + List rowRanges = new ArrayList<>(); + long currentStart = Long.MIN_VALUE; + long previous = Long.MIN_VALUE; + + while (rowIndexes.hasNext()) { + long idx = rowIndexes.nextLong(); + if (currentStart == Long.MIN_VALUE) { + currentStart = idx; + } else if (previous + 1 != idx) { + RowRange range = new RowRange(currentStart, previous); + rowRanges.add(range); + currentStart = idx; + } + previous = idx; + } + + if (previous != Long.MIN_VALUE) { + rowRanges.add(new RowRange(currentStart, previous)); + } + + return rowRanges.iterator(); + } + + /** Must be called at the beginning of reading a new batch. */ + void resetForNewBatch(int batchSize) { + this.rowsToReadInBatch = batchSize; + } + + /** Must be called at the beginning of reading a new page. */ + void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) { + this.valuesToReadInPage = totalValuesInPage; + this.rowId = pageFirstRowIndex; + } + + /** Returns the start index of the current row range. */ + public long currentRangeStart() { + return currentRange.start; + } + + /** Returns the end index of the current row range. */ + public long currentRangeEnd() { + return currentRange.end; + } + + public boolean isFinished() { + return this.currentRange.equals(this.END_ROW_RANGE); + } + + public boolean isMaxRange() { + return this.currentRange.equals(this.MAX_ROW_RANGE); + } + + public RowRange getCurrentRange() { + return currentRange; + } + + /** Advance to the next range. */ + public void nextRange() { + if (rowRanges == null) { + currentRange = MAX_ROW_RANGE; + } else if (!rowRanges.hasNext()) { + currentRange = END_ROW_RANGE; + } else { + currentRange = rowRanges.next(); + } + } + + /** Helper struct to represent a range of row indexes `[start, end]`. */ + public static class RowRange { + final long start; + final long end; + + RowRange(long start, long end) { + this.start = start; + this.end = end; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RowRange)) { + return false; + } + return ((RowRange) obj).start == this.start && ((RowRange) obj).end == this.end; + } + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java index 860ec54fa88b..a2be77414d5a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java @@ -87,58 +87,45 @@ public static ColumnReader createColumnReader( getAllColumnDescriptorByType(depth, type, columnDescriptors); switch (fieldType.getTypeRoot()) { case BOOLEAN: - return new BooleanColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new BooleanColumnReader(descriptors.get(0), pages); case TINYINT: - return new ByteColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new ByteColumnReader(descriptors.get(0), pages); case DOUBLE: - return new DoubleColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new DoubleColumnReader(descriptors.get(0), pages); case FLOAT: - return new FloatColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new FloatColumnReader(descriptors.get(0), pages); case INTEGER: case DATE: case TIME_WITHOUT_TIME_ZONE: - return new IntColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new IntColumnReader(descriptors.get(0), pages); case BIGINT: - return new LongColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new LongColumnReader(descriptors.get(0), pages); case SMALLINT: - return new ShortColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new ShortColumnReader(descriptors.get(0), pages); case CHAR: case VARCHAR: case BINARY: case VARBINARY: - return new BytesColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new BytesColumnReader(descriptors.get(0), pages); case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: if (descriptors.get(0).getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64) { - return new LongColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new LongColumnReader(descriptors.get(0), pages); } - return new TimestampColumnReader( - true, descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new TimestampColumnReader(true, descriptors.get(0), pages); case DECIMAL: switch (descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()) { case INT32: - return new IntColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new IntColumnReader(descriptors.get(0), pages); case INT64: - return new LongColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new LongColumnReader(descriptors.get(0), pages); case BINARY: - return new BytesColumnReader( - descriptors.get(0), pages.getPageReader(descriptors.get(0))); + return new BytesColumnReader(descriptors.get(0), pages); case FIXED_LEN_BYTE_ARRAY: return new FixedLenBytesColumnReader( descriptors.get(0), - pages.getPageReader(descriptors.get(0)), + pages, ((DecimalType) fieldType).getPrecision()); } case ARRAY: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java index 2dd1655d571f..ebb8f28fa1ee 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java @@ -194,6 +194,51 @@ private void readDictionaryIdData(int total, WritableIntVector c, int rowId) { } } + void skipDictionaryIds(int total, int level, RunLengthDecoder data) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) { + this.readNextGroup(); + } + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.skipDictionaryIdData(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + data.readInteger(); + } + } + break; + } + left -= n; + currentCount -= n; + } + } + + private void skipDictionaryIdData(int total) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) { + this.readNextGroup(); + } + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + break; + case PACKED: + currentBufferIdx += n; + break; + } + left -= n; + currentCount -= n; + } + } + /** Reads the next varint encoded int. */ private int readUnsignedVarInt() throws IOException { int value = 0; diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java index 7b32232261a7..bdb2f401fa3f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java @@ -22,7 +22,7 @@ import org.apache.paimon.data.columnar.writable.WritableShortVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.schema.PrimitiveType; import java.io.IOException; @@ -30,9 +30,9 @@ /** Short {@link ColumnReader}. Using INT32 to store short, so just cast int to short. */ public class ShortColumnReader extends AbstractColumnReader { - public ShortColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public ShortColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore) throws IOException { - super(descriptor, pageReader); + super(descriptor, pageReadStore); checkTypeName(PrimitiveType.PrimitiveTypeName.INT32); } @@ -71,6 +71,38 @@ protected void readBatch(int rowId, int num, WritableShortVector column) { } } + @Override + protected void skipBatch(int num) { + int left = num; + while (left > 0) { + if (runLenDecoder.currentCount == 0) { + runLenDecoder.readNextGroup(); + } + int n = Math.min(left, runLenDecoder.currentCount); + switch (runLenDecoder.mode) { + case RLE: + if (runLenDecoder.currentValue == maxDefLevel) { + skipShot(n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++] + == maxDefLevel) { + skipShot(1); + } + } + break; + } + left -= n; + runLenDecoder.currentCount -= n; + } + } + + private void skipShot(int num) { + skipDataBuffer(4 * num); + } + @Override protected void readBatchFromDictionaryIds( int rowId, int num, WritableShortVector column, WritableIntVector dictionaryIds) { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java index 4a279ff90e15..8767173315c2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java @@ -23,7 +23,7 @@ import org.apache.paimon.data.columnar.writable.WritableTimestampVector; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.PrimitiveType; @@ -49,9 +49,9 @@ public class TimestampColumnReader extends AbstractColumnReader