diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index c8ca21e88ef575..d1df51177fd496 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -41,6 +41,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.security.authentication.PreExecutionAuthenticator; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.es.EsExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalDatabase; @@ -453,13 +454,13 @@ private void refreshOnlyCatalogCache(boolean invalidCache) { } } - public final Optional getSchema(String dbName, String tblName) { + public final Optional getSchema(SchemaCacheKey key) { makeSureInitialized(); - Optional> db = getDb(dbName); + Optional> db = getDb(key.getDbName()); if (db.isPresent()) { - Optional table = db.get().getTable(tblName); + Optional table = db.get().getTable(key.getTblName()); if (table.isPresent()) { - return table.get().initSchemaAndUpdateTime(); + return table.get().initSchemaAndUpdateTime(key); } } return Optional.empty(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java index cc40ad292ce182..24f55e74266863 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java @@ -31,6 +31,8 @@ import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache; import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr; import org.apache.doris.datasource.metacache.MetaCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCache; +import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr; import org.apache.doris.fs.FileSystemCache; import org.apache.doris.nereids.exceptions.NotSupportedException; @@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr { private ExternalRowCountCache rowCountCache; private final IcebergMetadataCacheMgr icebergMetadataCacheMgr; private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr; + private final PaimonMetadataCacheMgr paimonMetadataCacheMgr; public ExternalMetaCacheMgr() { rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool( @@ -122,6 +125,7 @@ public ExternalMetaCacheMgr() { hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor); icebergMetadataCacheMgr = new IcebergMetadataCacheMgr(commonRefreshExecutor); maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr(); + paimonMetadataCacheMgr = new PaimonMetadataCacheMgr(commonRefreshExecutor); } public ExecutorService getFileListingExecutor() { @@ -167,6 +171,10 @@ public IcebergMetadataCache getIcebergMetadataCache() { return icebergMetadataCacheMgr.getIcebergMetadataCache(); } + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCacheMgr.getPaimonMetadataCache(); + } + public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) { return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId); } @@ -189,6 +197,7 @@ public void removeCache(long catalogId) { hudiPartitionMgr.removePartitionProcessor(catalogId); icebergMetadataCacheMgr.removeCache(catalogId); maxComputeMetadataCacheMgr.removeCache(catalogId); + paimonMetadataCacheMgr.removeCache(catalogId); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -204,6 +213,7 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName); icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); + paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName); if (LOG.isDebugEnabled()) { LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId); } @@ -222,6 +232,7 @@ public void invalidateDbCache(long catalogId, String dbName) { hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName); icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName); maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName); + paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName); if (LOG.isDebugEnabled()) { LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId); } @@ -239,6 +250,7 @@ public void invalidateCatalogCache(long catalogId) { hudiPartitionMgr.cleanPartitionProcess(catalogId); icebergMetadataCacheMgr.invalidateCatalogCache(catalogId); maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId); + paimonMetadataCacheMgr.invalidateCatalogCache(catalogId); if (LOG.isDebugEnabled()) { LOG.debug("invalid catalog cache for {}", catalogId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java index a0558766e81400..de3eeff75d97fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java @@ -74,7 +74,7 @@ public Long getValue() { } private Optional loadSchema(SchemaCacheKey key) { - Optional schema = catalog.getSchema(key.dbName, key.tblName); + Optional schema = catalog.getSchema(key); if (LOG.isDebugEnabled()) { LOG.debug("load schema for {} in catalog {}", key, catalog.getName()); } @@ -83,6 +83,10 @@ private Optional loadSchema(SchemaCacheKey key) { public Optional getSchemaValue(String dbName, String tblName) { SchemaCacheKey key = new SchemaCacheKey(dbName, tblName); + return getSchemaValue(key); + } + + public Optional getSchemaValue(SchemaCacheKey key) { return schemaCache.get(key); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index bd1e36e7bc968b..91df061678f154 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -31,6 +31,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.mvcc.MvccSnapshot; import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -317,8 +318,12 @@ public Optional getColumnStatistic(String colName) { * * @return */ - public Optional initSchemaAndUpdateTime() { + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { schemaUpdateTime = System.currentTimeMillis(); + return initSchema(key); + } + + public Optional initSchema(SchemaCacheKey key) { return initSchema(); } @@ -399,7 +404,7 @@ public SelectedPartitions initSelectedPartitions(Optional snapshot * @param snapshot if not support mvcc, ignore this * @return partitionName ==> PartitionItem */ - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { return Collections.emptyMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index ad685386ec9e89..da4670d6d0589d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -30,6 +30,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.TablePartitionValues; @@ -330,7 +331,7 @@ public SelectedPartitions initHudiSelectedPartitions(Optional tab } @Override - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { return getNameToPartitionItems(); } @@ -501,6 +502,10 @@ public Set getPartitionNames() { } @Override + public Optional initSchemaAndUpdateTime(SchemaCacheKey key) { + return initSchemaAndUpdateTime(); + } + public Optional initSchemaAndUpdateTime() { org.apache.hadoop.hive.metastore.api.Table table = ((HMSExternalCatalog) catalog).getClient() .getTable(dbName, name); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java index 0f748f59e927bc..dbbbcf2d6a1e5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java @@ -92,7 +92,7 @@ public List getPartitionColumns() { } @Override - protected Map getNameToPartitionItems(Optional snapshot) { + public Map getNameToPartitionItems(Optional snapshot) { if (getPartitionColumns().isEmpty()) { return Collections.emptyMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java new file mode 100644 index 00000000000000..ffdaff770e21a3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.mvcc; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.qe.ConnectContext; + +import java.util.Optional; + +public class MvccUtil { + /** + * get Snapshot From StatementContext + * + * @param tableIf + * @return MvccSnapshot + */ + public static Optional getSnapshotFromContext(TableIf tableIf) { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext == null) { + return Optional.empty(); + } + StatementContext statementContext = connectContext.getStatementContext(); + if (statementContext == null) { + return Optional.empty(); + } + return statementContext.getSnapshot(tableIf); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java index c9eaf1b7df32ef..7b59d879d9301c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java @@ -18,10 +18,16 @@ package org.apache.doris.datasource.paimon; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.ScalarType; -import org.apache.doris.catalog.Type; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ExternalAnalysisTask; @@ -30,30 +36,36 @@ import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; -import org.apache.paimon.types.ArrayType; +import org.apache.paimon.table.system.SchemasTable; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.DecimalType; -import org.apache.paimon.types.MapType; -import org.apache.paimon.types.RowType; -import java.util.ArrayList; +import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; +import java.util.Set; -public class PaimonExternalTable extends ExternalTable { +public class PaimonExternalTable extends ExternalTable implements MvccTable { private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class); + private final Table paimonTable; + public PaimonExternalTable(long id, String name, String dbName, PaimonExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE); + this.paimonTable = catalog.getPaimonTable(dbName, name); } public String getPaimonCatalogType() { @@ -67,99 +79,27 @@ protected synchronized void makeSureInitialized() { } } - public Table getPaimonTable() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null); + public Table getPaimonTable(Optional snapshot) { + return paimonTable.copy( + Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), + String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId()))); } - @Override - public Optional initSchema() { - Table paimonTable = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName, name); - TableSchema schema = ((FileStoreTable) paimonTable).schema(); - List columns = schema.fields(); - List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); - for (DataField field : columns) { - tmpSchema.add(new Column(field.name().toLowerCase(), - paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, - field.id())); - } - return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable)); - } - - private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { - int tsScale = 3; // default - switch (dataType.getTypeRoot()) { - case BOOLEAN: - return Type.BOOLEAN; - case INTEGER: - return Type.INT; - case BIGINT: - return Type.BIGINT; - case FLOAT: - return Type.FLOAT; - case DOUBLE: - return Type.DOUBLE; - case SMALLINT: - return Type.SMALLINT; - case TINYINT: - return Type.TINYINT; - case VARCHAR: - case BINARY: - case CHAR: - case VARBINARY: - return Type.STRING; - case DECIMAL: - DecimalType decimal = (DecimalType) dataType; - return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); - case DATE: - return ScalarType.createDateV2Type(); - case TIMESTAMP_WITHOUT_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.TimestampType) { - tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { - tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); - if (tsScale > 6) { - tsScale = 6; - } - } - return ScalarType.createDatetimeV2Type(tsScale); - case ARRAY: - ArrayType arrayType = (ArrayType) dataType; - Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); - return org.apache.doris.catalog.ArrayType.create(innerType, true); - case MAP: - MapType mapType = (MapType) dataType; - return new org.apache.doris.catalog.MapType( - paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); - case ROW: - RowType rowType = (RowType) dataType; - List fields = rowType.getFields(); - return new org.apache.doris.catalog.StructType(fields.stream() - .map(field -> new org.apache.doris.catalog.StructField(field.name(), - paimonTypeToDorisType(field.type()))) - .collect(Collectors.toCollection(ArrayList::new))); - case TIME_WITHOUT_TIME_ZONE: - return Type.UNSUPPORTED; - default: - LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); - return Type.UNSUPPORTED; + public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new PaimonSchemaCacheKey(dbName, name, schemaId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); } + return (PaimonSchemaCacheValue) schemaCacheValue.get(); } - protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { - return paimonPrimitiveTypeToDorisType(type); + private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() { + makeSureInitialized(); + return Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache() + .getPaimonSnapshot(catalog, dbName, name); } @Override @@ -189,13 +129,6 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { public long fetchRowCount() { makeSureInitialized(); long rowCount = 0; - Optional schemaCacheValue = getSchemaCacheValue(); - Table paimonTable = schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()) - .orElse(null); - if (paimonTable == null) { - LOG.info("Paimon table {} is null.", name); - return UNKNOWN_ROW_COUNT; - } List splits = paimonTable.newReadBuilder().newScan().plan().splits(); for (Split split : splits) { rowCount += split.rowCount(); @@ -205,4 +138,87 @@ public long fetchRowCount() { } return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT; } + + @Override + public List getPartitionColumns(Optional snapshot) { + return getPaimonSchemaCacheValue(snapshot).getPartitionColumns(); + } + + @Override + public MvccSnapshot loadSnapshot() { + return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue()); + } + + @Override + public Map getNameToPartitionItems(Optional snapshot) { + return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); + } + + @Override + public boolean supportInternalPartitionPruned() { + return true; + } + + @Override + public List getFullSchema() { + return getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema(); + } + + @Override + public Optional initSchema(SchemaCacheKey key) { + makeSureInitialized(); + PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key; + try { + PaimonSchema schema = loadPaimonSchemaBySchemaId(paimonSchemaCacheKey); + List columns = schema.getFields(); + List dorisColumns = Lists.newArrayListWithCapacity(columns.size()); + Set partitionColumnNames = Sets.newHashSet(schema.getPartitionKeys()); + List partitionColumns = Lists.newArrayList(); + for (DataField field : columns) { + Column column = new Column(field.name().toLowerCase(), + PaimonUtil.paimonTypeToDorisType(field.type()), true, null, true, field.description(), true, + field.id()); + dorisColumns.add(column); + if (partitionColumnNames.contains(field.name())) { + partitionColumns.add(column); + } + } + return Optional.of(new PaimonSchemaCacheValue(dorisColumns, partitionColumns)); + } catch (Exception e) { + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), + paimonSchemaCacheKey.getSchemaId()); + } + } + + private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) getCatalog()).getPaimonTable(key.getDbName(), + name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS); + PredicateBuilder builder = new PredicateBuilder(table.rowType()); + Predicate predicate = builder.equal(0, key.getSchemaId()); + // Adding predicates will also return excess data + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}, {2}}, predicate); + for (InternalRow row : rows) { + PaimonSchema schema = PaimonUtil.rowToSchema(row); + if (schema.getSchemaId() == key.getSchemaId()) { + return schema; + } + } + throw new CacheException("failed to initSchema for: %s.%s.%s.%s", + null, getCatalog().getName(), key.getDbName(), key.getTblName(), key.getSchemaId()); + } + + private PaimonSchemaCacheValue getPaimonSchemaCacheValue(Optional snapshot) { + PaimonSnapshotCacheValue snapshotCacheValue = getOrFetchSnapshotCacheValue(snapshot); + return getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId()); + } + + private PaimonSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { + if (snapshot.isPresent()) { + return ((PaimonMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getPaimonSnapshotCacheValue(); + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java new file mode 100644 index 00000000000000..5b711e070667b7 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.CacheFactory; +import org.apache.doris.common.Config; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.ExternalMetaCacheMgr; + +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.PartitionsTable; +import org.apache.paimon.table.system.SnapshotsTable; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCache { + + private final LoadingCache snapshotCache; + + public PaimonMetadataCache(ExecutorService executor) { + CacheFactory snapshotCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); + } + + @NotNull + private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) { + try { + PaimonSnapshot latestSnapshot = loadLatestSnapshot(key); + PaimonExternalTable table = (PaimonExternalTable) key.getCatalog().getDbOrAnalysisException(key.getDbName()) + .getTableOrAnalysisException(key.getTableName()); + List partitionColumns = table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId()) + .getPartitionColumns(); + PaimonPartitionInfo partitionInfo = loadPartitionInfo(key, partitionColumns); + return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot); + } catch (IOException | AnalysisException e) { + throw new CacheException("failed to loadSnapshot for: %s.%s.%s", + e, key.getCatalog().getName(), key.getDbName(), key.getTableName()); + } + } + + private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key, List partitionColumns) + throws IOException, AnalysisException { + if (CollectionUtils.isEmpty(partitionColumns)) { + return new PaimonPartitionInfo(); + } + List paimonPartitions = loadPartitions(key); + return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + } + + private List loadPartitions(PaimonSnapshotCacheKey key) + throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS); + List rows = PaimonUtil.read(table, null, null); + List res = Lists.newArrayListWithCapacity(rows.size()); + for (InternalRow row : rows) { + res.add(PaimonUtil.rowToPartition(row)); + } + return res; + } + + private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key) throws IOException { + Table table = ((PaimonExternalCatalog) key.getCatalog()).getPaimonTable(key.getDbName(), + key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS); + // snapshotId and schemaId + List rows = PaimonUtil.read(table, new int[][] {{0}, {1}}, null); + long latestSnapshotId = 0L; + long latestSchemaId = 0L; + for (InternalRow row : rows) { + long snapshotId = row.getLong(0); + if (snapshotId > latestSnapshotId) { + latestSnapshotId = snapshotId; + latestSchemaId = row.getLong(1); + } + } + return new PaimonSnapshot(latestSnapshotId, latestSchemaId); + } + + public void invalidateCatalogCache(long catalogId) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId) + .forEach(snapshotCache::invalidate); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName) + && key.getTableName().equals( + tblName)) + .forEach(snapshotCache::invalidate); + } + + public void invalidateDbCache(long catalogId, String dbName) { + snapshotCache.asMap().keySet().stream() + .filter(key -> key.getCatalog().getId() == catalogId && key.getDbName().equals(dbName)) + .forEach(snapshotCache::invalidate); + } + + public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog, String dbName, String tbName) { + PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog, dbName, tbName); + return snapshotCache.get(key); + } + + public Map> getCacheStats() { + Map> res = Maps.newHashMap(); + res.put("paimon_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), + snapshotCache.estimatedSize())); + return res; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java new file mode 100644 index 00000000000000..a282fde665b197 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import java.util.concurrent.ExecutorService; + +public class PaimonMetadataCacheMgr { + + private PaimonMetadataCache paimonMetadataCache; + + public PaimonMetadataCacheMgr(ExecutorService executor) { + this.paimonMetadataCache = new PaimonMetadataCache(executor); + } + + public PaimonMetadataCache getPaimonMetadataCache() { + return paimonMetadataCache; + } + + public void removeCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateCatalogCache(long catalogId) { + paimonMetadataCache.invalidateCatalogCache(catalogId); + } + + public void invalidateTableCache(long catalogId, String dbName, String tblName) { + paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName); + } + + public void invalidateDbCache(long catalogId, String dbName) { + paimonMetadataCache.invalidateDbCache(catalogId, dbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java new file mode 100644 index 00000000000000..2307e91adb3911 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +public class PaimonMvccSnapshot implements MvccSnapshot { + private final PaimonSnapshotCacheValue snapshotCacheValue; + + public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) { + this.snapshotCacheValue = snapshotCacheValue; + } + + public PaimonSnapshotCacheValue getSnapshotCacheValue() { + return snapshotCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java new file mode 100644 index 00000000000000..545448199b3375 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table +public class PaimonPartition { + // Partition values, for example: [1, dd] + private final String partitionValues; + // The amount of data in the partition + private final long recordCount; + // Partition file size + private final long fileSizeInBytes; + // Number of partition files + private final long fileCount; + // Last update time of partition + private final long lastUpdateTime; + + public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount, + long lastUpdateTime) { + this.partitionValues = partitionValues; + this.recordCount = recordCount; + this.fileSizeInBytes = fileSizeInBytes; + this.fileCount = fileCount; + this.lastUpdateTime = lastUpdateTime; + } + + public String getPartitionValues() { + return partitionValues; + } + + public long getRecordCount() { + return recordCount; + } + + public long getFileSizeInBytes() { + return fileSizeInBytes; + } + + public long getFileCount() { + return fileCount; + } + + public long getLastUpdateTime() { + return lastUpdateTime; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java new file mode 100644 index 00000000000000..4d3326f8e48376 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class PaimonPartitionInfo { + private final Map nameToPartitionItem; + private final Map nameToPartition; + + public PaimonPartitionInfo() { + this.nameToPartitionItem = Maps.newHashMap(); + this.nameToPartition = Maps.newHashMap(); + } + + public PaimonPartitionInfo(Map nameToPartitionItem, + Map nameToPartition) { + this.nameToPartitionItem = nameToPartitionItem; + this.nameToPartition = nameToPartition; + } + + public Map getNameToPartitionItem() { + return nameToPartitionItem; + } + + public Map getNameToPartition() { + return nameToPartition; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java new file mode 100644 index 00000000000000..ef26e1ed20879d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.paimon.types.DataField; + +import java.util.List; + +public class PaimonSchema { + private final long schemaId; + private final List fields; + private final List partitionKeys; + + public PaimonSchema(long schemaId, List fields, List partitionKeys) { + this.schemaId = schemaId; + this.fields = fields; + this.partitionKeys = partitionKeys; + } + + public long getSchemaId() { + return schemaId; + } + + public List getFields() { + return fields; + } + + public List getPartitionKeys() { + return partitionKeys; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java new file mode 100644 index 00000000000000..f74555b369b380 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; + +import com.google.common.base.Objects; + +public class PaimonSchemaCacheKey extends SchemaCacheKey { + private final long schemaId; + + public PaimonSchemaCacheKey(String dbName, String tableName, long schemaId) { + super(dbName, tableName); + this.schemaId = schemaId; + } + + public long getSchemaId() { + return schemaId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PaimonSchemaCacheKey)) { + return false; + } + if (!super.equals(o)) { + return false; + } + PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o; + return schemaId == that.schemaId; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), schemaId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java index aaaefe7f32db2b..ccb530a3cbccc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java @@ -20,20 +20,18 @@ import org.apache.doris.catalog.Column; import org.apache.doris.datasource.SchemaCacheValue; -import org.apache.paimon.table.Table; - import java.util.List; public class PaimonSchemaCacheValue extends SchemaCacheValue { - private Table paimonTable; + private List partitionColumns; - public PaimonSchemaCacheValue(List schema, Table paimonTable) { + public PaimonSchemaCacheValue(List schema, List partitionColumns) { super(schema); - this.paimonTable = paimonTable; + this.partitionColumns = partitionColumns; } - public Table getPaimonTable() { - return paimonTable; + public List getPartitionColumns() { + return partitionColumns; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java new file mode 100644 index 00000000000000..4a536dd72cc901 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshot { + private final long snapshotId; + private final long schemaId; + + public PaimonSnapshot(long snapshotId, long schemaId) { + this.snapshotId = snapshotId; + this.schemaId = schemaId; + } + + public long getSnapshotId() { + return snapshotId; + } + + public long getSchemaId() { + return schemaId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java new file mode 100644 index 00000000000000..970f111a72133f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.datasource.CatalogIf; + +import java.util.Objects; +import java.util.StringJoiner; + +public class PaimonSnapshotCacheKey { + private final CatalogIf catalog; + private final String dbName; + private final String tableName; + + public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String tableName) { + this.catalog = catalog; + this.dbName = dbName; + this.tableName = tableName; + } + + public CatalogIf getCatalog() { + return catalog; + } + + public String getDbName() { + return dbName; + } + + public String getTableName() { + return tableName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o; + return catalog.getId() == that.catalog.getId() + && Objects.equals(dbName, that.dbName) + && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(catalog.getId(), dbName, tableName); + } + + @Override + public String toString() { + return new StringJoiner(", ", PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]") + .add("catalog=" + catalog) + .add("dbName='" + dbName + "'") + .add("tableName='" + tableName + "'") + .toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java new file mode 100644 index 00000000000000..c50ecdabfde3df --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +public class PaimonSnapshotCacheValue { + + private final PaimonPartitionInfo partitionInfo; + private final PaimonSnapshot snapshot; + + public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo, PaimonSnapshot snapshot) { + this.partitionInfo = partitionInfo; + this.snapshot = snapshot; + } + + public PaimonPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public PaimonSnapshot getSnapshot() { + return snapshot; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java new file mode 100644 index 00000000000000..1f7576dca51d93 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.paimon; + +import org.apache.doris.analysis.PartitionValue; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.hive.HiveUtil; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Projection; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class PaimonUtil { + private static final Logger LOG = LogManager.getLogger(PaimonUtil.class); + + public static List read( + Table table, @Nullable int[][] projection, @Nullable Predicate predicate, + Pair, String>... dynamicOptions) + throws IOException { + Map options = new HashMap<>(); + for (Pair, String> pair : dynamicOptions) { + options.put(pair.getKey().key(), pair.getValue()); + } + table = table.copy(options); + ReadBuilder readBuilder = table.newReadBuilder(); + if (projection != null) { + readBuilder.withProjection(projection); + } + if (predicate != null) { + readBuilder.withFilter(predicate); + } + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + InternalRowSerializer serializer = + new InternalRowSerializer( + projection == null + ? table.rowType() + : Projection.of(projection).project(table.rowType())); + List rows = new ArrayList<>(); + reader.forEachRemaining(row -> rows.add(serializer.copy(row))); + return rows; + } + + + /* + https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table + +---------------+----------------+--------------------+--------------------+------------------------+ + | partition | record_count | file_size_in_bytes| file_count| last_update_time| + +---------------+----------------+--------------------+--------------------+------------------------+ + | [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400| + +---------------+----------------+--------------------+--------------------+------------------------+ + org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "partition", SerializationUtils.newStringType(true)), + new DataField(1, "record_count", new BigIntType(false)), + new DataField(2, "file_size_in_bytes", new BigIntType(false)), + new DataField(3, "file_count", new BigIntType(false)), + new DataField(4, "last_update_time", DataTypes.TIMESTAMP_MILLIS()))); + */ + public static PaimonPartition rowToPartition(InternalRow row) { + String partition = row.getString(0).toString(); + long recordCount = row.getLong(1); + long fileSizeInBytes = row.getLong(2); + long fileCount = row.getLong(3); + long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond(); + return new PaimonPartition(partition, recordCount, fileSizeInBytes, fileCount, lastUpdateTime); + } + + public static PaimonPartitionInfo generatePartitionInfo(List partitionColumns, + List paimonPartitions) throws AnalysisException { + Map nameToPartitionItem = Maps.newHashMap(); + Map nameToPartition = Maps.newHashMap(); + PaimonPartitionInfo partitionInfo = new PaimonPartitionInfo(nameToPartitionItem, nameToPartition); + if (CollectionUtils.isEmpty(partitionColumns)) { + return partitionInfo; + } + for (PaimonPartition paimonPartition : paimonPartitions) { + String partitionName = getPartitionName(partitionColumns, paimonPartition.getPartitionValues()); + nameToPartition.put(partitionName, paimonPartition); + nameToPartitionItem.put(partitionName, toListPartitionItem(partitionName, partitionColumns)); + } + return partitionInfo; + } + + private static String getPartitionName(List partitionColumns, String partitionValueStr) { + Preconditions.checkNotNull(partitionValueStr); + String[] partitionValues = partitionValueStr.replace("[", "").replace("]", "") + .split(","); + Preconditions.checkState(partitionColumns.size() == partitionValues.length); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < partitionColumns.size(); ++i) { + if (i != 0) { + sb.append("/"); + } + sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]); + } + return sb.toString(); + } + + public static ListPartitionItem toListPartitionItem(String partitionName, List partitionColumns) + throws AnalysisException { + List types = partitionColumns.stream() + .map(Column::getType) + .collect(Collectors.toList()); + // Partition name will be in format: nation=cn/city=beijing + // parse it to get values "cn" and "beijing" + List partitionValues = HiveUtil.toPartitionValues(partitionName); + Preconditions.checkState(partitionValues.size() == types.size(), partitionName + " vs. " + types); + List values = Lists.newArrayListWithExpectedSize(types.size()); + for (String partitionValue : partitionValues) { + // null will in partition 'null' + // "null" will in partition 'null' + // NULL will in partition 'null' + // "NULL" will in partition 'NULL' + // values.add(new PartitionValue(partitionValue, "null".equals(partitionValue))); + values.add(new PartitionValue(partitionValue, false)); + } + PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true); + ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key)); + return listPartitionItem; + } + + private static Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) { + int tsScale = 3; // default + switch (dataType.getTypeRoot()) { + case BOOLEAN: + return Type.BOOLEAN; + case INTEGER: + return Type.INT; + case BIGINT: + return Type.BIGINT; + case FLOAT: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + case SMALLINT: + return Type.SMALLINT; + case TINYINT: + return Type.TINYINT; + case VARCHAR: + case BINARY: + case CHAR: + case VARBINARY: + return Type.STRING; + case DECIMAL: + DecimalType decimal = (DecimalType) dataType; + return ScalarType.createDecimalV3Type(decimal.getPrecision(), decimal.getScale()); + case DATE: + return ScalarType.createDateV2Type(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + if (dataType instanceof org.apache.paimon.types.TimestampType) { + tsScale = ((org.apache.paimon.types.TimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } else if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (dataType instanceof org.apache.paimon.types.LocalZonedTimestampType) { + tsScale = ((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision(); + if (tsScale > 6) { + tsScale = 6; + } + } + return ScalarType.createDatetimeV2Type(tsScale); + case ARRAY: + ArrayType arrayType = (ArrayType) dataType; + Type innerType = paimonPrimitiveTypeToDorisType(arrayType.getElementType()); + return org.apache.doris.catalog.ArrayType.create(innerType, true); + case MAP: + MapType mapType = (MapType) dataType; + return new org.apache.doris.catalog.MapType( + paimonTypeToDorisType(mapType.getKeyType()), paimonTypeToDorisType(mapType.getValueType())); + case ROW: + RowType rowType = (RowType) dataType; + List fields = rowType.getFields(); + return new org.apache.doris.catalog.StructType(fields.stream() + .map(field -> new org.apache.doris.catalog.StructField(field.name(), + paimonTypeToDorisType(field.type()))) + .collect(Collectors.toCollection(ArrayList::new))); + case TIME_WITHOUT_TIME_ZONE: + return Type.UNSUPPORTED; + default: + LOG.warn("Cannot transform unknown type: " + dataType.getTypeRoot()); + return Type.UNSUPPORTED; + } + } + + public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType type) { + return paimonPrimitiveTypeToDorisType(type); + } + + /** + * https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table + * demo: + * 0 + * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"}, + * {"id":1,"name":"item_id","type":"BIGINT"}, + * {"id":2,"name":"behavior","type":"STRING"}, + * {"id":3,"name":"dt","type":"STRING NOT NULL"}, + * {"id":4,"name":"hh","type":"STRING NOT NULL"}] + * ["dt"] + * ["dt","hh","user_id"] + * {"owner":"hadoop","provider":"paimon"} + * 2024-12-03 15:38:14.734 + * + * @param row + * @return + */ + public static PaimonSchema rowToSchema(InternalRow row) { + long schemaId = row.getLong(0); + String fieldsStr = row.getString(1).toString(); + String partitionKeysStr = row.getString(2).toString(); + List fields = JsonSerdeUtil.fromJson(fieldsStr, new TypeReference>() { + }); + List partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr, new TypeReference>() { + }); + return new PaimonSchema(schemaId, fields, partitionKeys); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 885eba06ed956d..a8bb814f1d353b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; import org.apache.doris.thrift.TFileAttributes; @@ -36,7 +37,7 @@ public class PaimonSource { public PaimonSource(TupleDescriptor desc) { this.desc = desc; this.paimonExtTable = (PaimonExternalTable) desc.getTable(); - this.originTable = paimonExtTable.getPaimonTable(); + this.originTable = paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable)); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 9d77ac6a6e61c3..42eee425269e96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -30,6 +30,9 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccTableInfo; import org.apache.doris.job.common.TaskStatus; import org.apache.doris.job.exception.JobException; import org.apache.doris.job.task.AbstractTask; @@ -72,6 +75,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.UUID; @@ -143,6 +147,8 @@ public enum MTMVTaskRefreshMode { private StmtExecutor executor; private Map partitionSnapshots; + private final Map snapshots = Maps.newHashMap(); + public MTMVTask() { } @@ -231,6 +237,9 @@ private void exec(Set refreshPartitionNames, throws Exception { ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); + for (Entry entry : snapshots.entrySet()) { + statementContext.setSnapshot(entry.getKey(), entry.getValue()); + } ctx.setStatementContext(statementContext); TUniqueId queryId = generateQueryId(); lastQueryId = DebugUtil.printId(queryId); @@ -318,6 +327,11 @@ private void beforeMTMVRefresh() throws AnalysisException, DdlException { MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf; baseTableIf.beforeMTMVRefresh(mtmv); } + if (tableIf instanceof MvccTable) { + MvccTable mvccTable = (MvccTable) tableIf; + MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot(); + snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index d76451d963762c..4cc4a6c8600680 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -622,7 +622,11 @@ public void addPlannerHook(PlannerHook plannerHook) { public void loadSnapshots() { for (TableIf tableIf : tables.values()) { if (tableIf instanceof MvccTable) { - snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot()); + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + // may be set by MTMV, we can not load again + if (!snapshots.containsKey(mvccTableInfo)) { + snapshots.put(mvccTableInfo, ((MvccTable) tableIf).loadSnapshot()); + } } } } @@ -630,11 +634,25 @@ public void loadSnapshots() { /** * Obtain snapshot information of mvcc * - * @param mvccTable mvccTable + * @param tableIf tableIf * @return MvccSnapshot */ - public MvccSnapshot getSnapshot(MvccTable mvccTable) { - return snapshots.get(new MvccTableInfo(mvccTable)); + public Optional getSnapshot(TableIf tableIf) { + if (!(tableIf instanceof MvccTable)) { + return Optional.empty(); + } + MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf); + return Optional.ofNullable(snapshots.get(mvccTableInfo)); + } + + /** + * Obtain snapshot information of mvcc + * + * @param mvccTableInfo mvccTableInfo + * @param snapshot snapshot + */ + public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot snapshot) { + snapshots.put(mvccTableInfo, snapshot); } private static class CloseableResource implements Closeable { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java index ba8b270d1f397d..e99906f5e13dc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java @@ -36,7 +36,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; @@ -74,8 +73,8 @@ public Rule build() { private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, LogicalFilter filter, LogicalFileScan scan, CascadesContext ctx) { Map selectedPartitionItems = Maps.newHashMap(); - // todo: real snapshotId - if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) { + if (CollectionUtils.isEmpty(externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)))) { // non partitioned table, return NOT_PRUNED. // non partition table will be handled in HiveScanNode. return SelectedPartitions.NOT_PRUNED; @@ -83,8 +82,8 @@ private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable, Map scanOutput = scan.getOutput() .stream() .collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity())); - // todo: real snapshotId - List partitionSlots = externalTable.getPartitionColumns(Optional.empty()) + List partitionSlots = externalTable.getPartitionColumns( + ctx.getStatementContext().getSnapshot(externalTable)) .stream() .map(column -> scanOutput.get(column.getName().toLowerCase())) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 36cc0f95a77a8e..b0a95ffdd3aab1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -28,6 +28,8 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.nereids.analyzer.UnboundRelation; @@ -316,6 +318,11 @@ public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, partitionHasDataItems.add( ((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId())); } + if (targetTable instanceof ExternalTable) { + // Add filter only when partition has data when external table + partitionHasDataItems.add(((ExternalTable) targetTable).getNameToPartitionItems( + MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName)); + } } if (partitionHasDataItems.isEmpty()) { predicates.setNeedAddFilter(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java index 96b8e032d11274..1f5f71f7bafe59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.TableSample; @@ -60,10 +61,10 @@ protected LogicalFileScan(RelationId id, ExternalTable table, List quali } public LogicalFileScan(RelationId id, ExternalTable table, List qualifier, - Optional tableSample, Optional tableSnapshot) { - // todo: real snapshotId + Optional tableSample, Optional tableSnapshot) { this(id, table, qualifier, Optional.empty(), Optional.empty(), - table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot); + table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)), + tableSample, tableSnapshot); } public SelectedPartitions getSelectedPartitions() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java new file mode 100644 index 00000000000000..789af7bf8357ac --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.mtmv; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.paimon.PaimonPartition; +import org.apache.doris.datasource.paimon.PaimonPartitionInfo; +import org.apache.doris.datasource.paimon.PaimonUtil; + +import com.google.common.collect.Lists; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.Timestamp; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +public class PaimonUtilTest { + + @Test + public void testGeneratePartitionInfo() throws AnalysisException { + Column k1 = new Column("k1", PrimitiveType.INT); + Column k2 = new Column("k2", PrimitiveType.VARCHAR); + List partitionColumns = Lists.newArrayList(k1, k2); + PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5); + List paimonPartitions = Lists.newArrayList(p1); + PaimonPartitionInfo partitionInfo = PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions); + String expectPartitionName = "k1=1/k2=aa"; + Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName)); + PartitionItem partitionItem = partitionInfo.getNameToPartitionItem().get(expectPartitionName); + List keys = partitionItem.getItems(); + Assert.assertEquals(1, keys.size()); + PartitionKey partitionKey = keys.get(0); + List exprs = partitionKey.getKeys(); + Assert.assertEquals(2, exprs.size()); + Assert.assertEquals(1, exprs.get(0).getLongValue()); + Assert.assertEquals("aa", exprs.get(1).getStringValue()); + } + + @Test + public void testRowToPartition() { + GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L, 3L, 4L, Timestamp.fromEpochMillis(5L)); + PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row); + Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues()); + Assert.assertEquals(2L, paimonPartition.getRecordCount()); + Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes()); + Assert.assertEquals(4L, paimonPartition.getFileCount()); + Assert.assertEquals(5L, paimonPartition.getLastUpdateTime()); + } +} diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out b/regression-test/data/mtmv_p0/test_paimon_mtmv.out deleted file mode 100644 index c654cb01214f57..00000000000000 --- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out +++ /dev/null @@ -1,9 +0,0 @@ --- This file is automatically generated. You should know what you did if you want to edit this --- !catalog -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 - --- !mtmv -- -1 2 3 4 5 6 7 8 9.1 10.1 11.10 2020-02-02 13str 14varchar a true aaaa 2023-08-13T09:32:38.530 -10 20 30 40 50 60 70 80 90.1 100.1 110.10 2020-03-02 130str 140varchar b false bbbb 2023-08-14T08:32:52.821 - diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy deleted file mode 100644 index e84eb497b2c7b1..00000000000000 --- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy +++ /dev/null @@ -1,62 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite("test_paimon_mtmv", "p0,external,paimon,external_docker,external_docker_hive") { - String enabled = context.config.otherConfigs.get("enablePaimonTest") - logger.info("enabled: " + enabled) - String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") - logger.info("externalEnvIp: " + externalEnvIp) - String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort") - logger.info("hdfs_port: " + hdfs_port) - if (enabled != null && enabled.equalsIgnoreCase("true")) { - String catalog_name = "paimon_mtmv_catalog"; - String mvName = "test_paimon_mtmv" - String dbName = "regression_test_mtmv_p0" - String paimonDb = "db1" - String paimonTable = "all_table" - sql """drop catalog if exists ${catalog_name} """ - - sql """create catalog if not exists ${catalog_name} properties ( - "type" = "paimon", - "paimon.catalog.type"="filesystem", - "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" - );""" - - order_qt_catalog """select * from ${catalog_name}.${paimonDb}.${paimonTable}""" - sql """drop materialized view if exists ${mvName};""" - - sql """ - CREATE MATERIALIZED VIEW ${mvName} - BUILD DEFERRED REFRESH AUTO ON MANUAL - DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS - SELECT * FROM ${catalog_name}.${paimonDb}.${paimonTable}; - """ - - sql """ - REFRESH MATERIALIZED VIEW ${mvName} complete - """ - def jobName = getJobName(dbName, mvName); - waitingMTMVTaskFinished(jobName) - order_qt_mtmv "SELECT * FROM ${mvName}" - - sql """drop materialized view if exists ${mvName};""" - sql """ drop catalog if exists ${catalog_name} """ - } -} -