From 6d6ff2d56dca970756b9b78778cf80c772b35f2d Mon Sep 17 00:00:00 2001 From: James Date: Tue, 24 Dec 2024 11:58:51 +0800 Subject: [PATCH] [feature](mtmv)Support iceberg mtmv query. (#45659) ### What problem does this PR solve? 1. Implement MvccTable interface for IcebertExternalTable 2. IcebergExternalTable overrides the methods in ExternalTable and supports partition pruning 3. Add snapshot cache in IcebergMetadataCache to store IcebergExternalTable partition infos. Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None --- .../doris/catalog/RangePartitionItem.java | 7 +- .../datasource/hive/HMSExternalTable.java | 2 +- .../iceberg/IcebergExternalTable.java | 171 +++++++++++------- .../iceberg/IcebergMetadataCache.java | 42 ++++- .../iceberg/IcebergMvccSnapshot.java | 32 ++++ .../iceberg/IcebergSchemaCacheKey.java | 55 ++++++ .../iceberg/IcebergSchemaCacheValue.java | 15 +- .../datasource/iceberg/IcebergSnapshot.java | 36 ++++ .../iceberg/IcebergSnapshotCacheValue.java | 37 ++++ .../datasource/iceberg/IcebergUtils.java | 14 +- .../iceberg/IcebergExternalTableTest.java | 48 +++-- .../data/mtmv_p0/test_iceberg_mtmv.out | 15 ++ .../suites/mtmv_p0/test_iceberg_mtmv.groovy | 56 ++++++ 13 files changed, 423 insertions(+), 107 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshot.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index 96bf0097c28a51..cad6ca38130420 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -65,14 +65,9 @@ public boolean isDefaultPartition() { @Override public PartitionKeyDesc toPartitionKeyDesc() { - if (partitionKeyRange.hasLowerBound()) { - return PartitionKeyDesc.createFixed( + return PartitionKeyDesc.createFixed( PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()), PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); - } else { - // For null partition value. - return PartitionKeyDesc.createLessThan(PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint())); - } } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java index da4670d6d0589d..a6fb486bed9c65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java @@ -543,7 +543,7 @@ public Optional initSchema() { } private List getIcebergSchema() { - return IcebergUtils.getSchema(catalog, dbName, name); + return IcebergUtils.getSchema(catalog, dbName, name, IcebergUtils.UNKNOWN_SNAPSHOT_ID); } private List getHudiSchema() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index e259399f63740b..7f7d2fdf578292 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -27,9 +27,14 @@ import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.CacheException; +import org.apache.doris.datasource.ExternalSchemaCache; +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.SchemaCacheValue; import org.apache.doris.datasource.mvcc.MvccSnapshot; +import org.apache.doris.datasource.mvcc.MvccTable; +import org.apache.doris.datasource.mvcc.MvccUtil; import org.apache.doris.mtmv.MTMVBaseTableIf; import org.apache.doris.mtmv.MTMVRefreshContext; import org.apache.doris.mtmv.MTMVRelatedTableIf; @@ -77,7 +82,7 @@ import java.util.Set; import java.util.stream.Collectors; -public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf { +public class IcebergExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable { public static final String YEAR = "year"; public static final String MONTH = "month"; @@ -117,39 +122,23 @@ public void setPartitionColumns(List partitionColumns) { } @Override - public Optional initSchema() { - table = IcebergUtils.getIcebergTable(catalog, dbName, name); - List schema = IcebergUtils.getSchema(catalog, dbName, name); - Snapshot snapshot = table.currentSnapshot(); - if (snapshot == null) { - LOG.debug("Table {} is empty", name); - return Optional.of(new IcebergSchemaCacheValue(schema, null, -1, null)); - } - long snapshotId = snapshot.snapshotId(); - partitionColumns = null; - IcebergPartitionInfo partitionInfo = null; - if (isValidRelatedTable()) { - PartitionSpec spec = table.spec(); - partitionColumns = Lists.newArrayList(); - - // For iceberg table, we only support table with 1 partition column as RelatedTable. - // So we use spec.fields().get(0) to get the partition column. - Types.NestedField col = table.schema().findField(spec.fields().get(0).sourceId()); + public Optional initSchema(SchemaCacheKey key) { + table = getIcebergTable(); + List schema = IcebergUtils.getSchema(catalog, dbName, name, + ((IcebergSchemaCacheKey) key).getSchemaId()); + List tmpColumns = Lists.newArrayList(); + PartitionSpec spec = table.spec(); + for (PartitionField field : spec.fields()) { + Types.NestedField col = table.schema().findField(field.sourceId()); for (Column c : schema) { if (c.getName().equalsIgnoreCase(col.name())) { - partitionColumns.add(c); + tmpColumns.add(c); break; } } - Preconditions.checkState(partitionColumns.size() == 1, - "Support 1 partition column for iceberg table, but found " + partitionColumns.size()); - try { - partitionInfo = loadPartitionInfo(); - } catch (AnalysisException e) { - LOG.warn("Failed to load iceberg table {} partition info.", name, e); - } } - return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns, snapshotId, partitionInfo)); + partitionColumns = tmpColumns; + return Optional.of(new IcebergSchemaCacheValue(schema, partitionColumns)); } @Override @@ -187,6 +176,11 @@ public Table getIcebergTable() { return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName()); } + private IcebergSnapshotCacheValue getIcebergSnapshotCacheValue() { + return Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache() + .getSnapshotCache(catalog, dbName, name); + } + @Override public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { Env.getCurrentEnv().getRefreshManager() @@ -195,46 +189,36 @@ public void beforeMTMVRefresh(MTMV mtmv) throws DdlException { @Override public Map getAndCopyPartitionItems(Optional snapshot) { - return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem()); + return Maps.newHashMap(getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem()); } - private IcebergPartitionInfo getPartitionInfoFromCache() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - return new IcebergPartitionInfo(); - } - return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo(); + @Override + public Map getNameToPartitionItems(Optional snapshot) { + return getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem(); } @Override public PartitionType getPartitionType(Optional snapshot) { - makeSureInitialized(); return isValidRelatedTable() ? PartitionType.RANGE : PartitionType.UNPARTITIONED; } @Override public Set getPartitionColumnNames(Optional snapshot) throws DdlException { - return getPartitionColumnsFromCache().stream().map(Column::getName).collect(Collectors.toSet()); + return getPartitionColumns(snapshot).stream().map(Column::getName).collect(Collectors.toSet()); } @Override public List getPartitionColumns(Optional snapshot) { - return getPartitionColumnsFromCache(); - } - - private List getPartitionColumnsFromCache() { - makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - return schemaCacheValue - .map(cacheValue -> ((IcebergSchemaCacheValue) cacheValue).getPartitionColumns()) - .orElseGet(Lists::newArrayList); + IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + IcebergSchemaCacheValue schemaValue = getIcebergSchemaCacheValue(snapshotValue.getSnapshot().getSchemaId()); + return schemaValue.getPartitionColumns(); } @Override public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - long latestSnapshotId = getPartitionInfoFromCache().getLatestSnapshotId(partitionName); + IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + long latestSnapshotId = snapshotValue.getPartitionInfo().getLatestSnapshotId(partitionName); if (latestSnapshotId <= 0) { throw new AnalysisException("can not find partition: " + partitionName); } @@ -244,16 +228,9 @@ public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshCont @Override public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional snapshot) throws AnalysisException { - return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache()); - } - - public long getLatestSnapshotIdFromCache() throws AnalysisException { makeSureInitialized(); - Optional schemaCacheValue = getSchemaCacheValue(); - if (!schemaCacheValue.isPresent()) { - throw new AnalysisException("Can't find schema cache of table " + name); - } - return ((IcebergSchemaCacheValue) schemaCacheValue.get()).getSnapshotId(); + IcebergSnapshotCacheValue snapshotValue = getOrFetchSnapshotCacheValue(snapshot); + return new MTMVVersionSnapshot(snapshotValue.getSnapshot().getSnapshotId()); } @Override @@ -268,11 +245,13 @@ public boolean isPartitionColumnAllowNull() { */ @Override public boolean isValidRelatedTable() { + makeSureInitialized(); if (isValidRelatedTableCached) { return isValidRelatedTable; } isValidRelatedTable = false; Set allFields = Sets.newHashSet(); + table = getIcebergTable(); for (PartitionSpec spec : table.specs().values()) { if (spec == null) { isValidRelatedTableCached = true; @@ -299,14 +278,62 @@ public boolean isValidRelatedTable() { return isValidRelatedTable; } - protected IcebergPartitionInfo loadPartitionInfo() throws AnalysisException { - List icebergPartitions = loadIcebergPartition(); + @Override + public MvccSnapshot loadSnapshot() { + return new IcebergMvccSnapshot(getIcebergSnapshotCacheValue()); + } + + public long getLatestSnapshotId() { + table = getIcebergTable(); + Snapshot snapshot = table.currentSnapshot(); + return snapshot == null ? IcebergUtils.UNKNOWN_SNAPSHOT_ID : table.currentSnapshot().snapshotId(); + } + + public long getSchemaId(long snapshotId) { + table = getIcebergTable(); + return snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID + ? IcebergUtils.UNKNOWN_SNAPSHOT_ID + : table.snapshot(snapshotId).schemaId(); + } + + @Override + public List getFullSchema() { + Optional snapshotFromContext = MvccUtil.getSnapshotFromContext(this); + IcebergSnapshotCacheValue cacheValue = getOrFetchSnapshotCacheValue(snapshotFromContext); + return getIcebergSchemaCacheValue(cacheValue.getSnapshot().getSchemaId()).getSchema(); + } + + @Override + public boolean supportInternalPartitionPruned() { + return true; + } + + public IcebergSchemaCacheValue getIcebergSchemaCacheValue(long schemaId) { + ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog); + Optional schemaCacheValue = cache.getSchemaValue( + new IcebergSchemaCacheKey(dbName, name, schemaId)); + if (!schemaCacheValue.isPresent()) { + throw new CacheException("failed to getSchema for: %s.%s.%s.%s", + null, catalog.getName(), dbName, name, schemaId); + } + return (IcebergSchemaCacheValue) schemaCacheValue.get(); + } + + public IcebergPartitionInfo loadPartitionInfo(long snapshotId) throws AnalysisException { + // snapshotId == UNKNOWN_SNAPSHOT_ID means this is an empty table, haven't contained any snapshot yet. + if (!isValidRelatedTable() || snapshotId == IcebergUtils.UNKNOWN_SNAPSHOT_ID) { + return new IcebergPartitionInfo(); + } + List icebergPartitions = loadIcebergPartition(snapshotId); Map nameToPartition = Maps.newHashMap(); Map nameToPartitionItem = Maps.newHashMap(); + table = getIcebergTable(); + partitionColumns = getIcebergSchemaCacheValue(table.snapshot(snapshotId).schemaId()).getPartitionColumns(); for (IcebergPartition partition : icebergPartitions) { nameToPartition.put(partition.getPartitionName(), partition); String transform = table.specs().get(partition.getSpecId()).fields().get(0).transform().toString(); - Range partitionRange = getPartitionRange(partition.getPartitionValues().get(0), transform); + Range partitionRange = getPartitionRange( + partition.getPartitionValues().get(0), transform, partitionColumns); PartitionItem item = new RangePartitionItem(partitionRange); nameToPartitionItem.put(partition.getPartitionName(), item); } @@ -314,11 +341,11 @@ protected IcebergPartitionInfo loadPartitionInfo() throws AnalysisException { return new IcebergPartitionInfo(nameToPartitionItem, nameToPartition, partitionNameMap); } - public List loadIcebergPartition() { + public List loadIcebergPartition(long snapshotId) { PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils .createMetadataTableInstance(table, MetadataTableType.PARTITIONS); List partitions = Lists.newArrayList(); - try (CloseableIterable tasks = partitionsTable.newScan().planFiles()) { + try (CloseableIterable tasks = partitionsTable.newScan().useSnapshot(snapshotId).planFiles()) { for (FileScanTask task : tasks) { CloseableIterable rows = task.asDataTask().rows(); for (StructLike row : rows) { @@ -344,6 +371,7 @@ public IcebergPartition generateIcebergPartition(StructLike row) { // 8. equality_delete_file_count, // 9. last_updated_at, // 10. last_updated_snapshot_id + table = getIcebergTable(); Preconditions.checkState(!table.spec().fields().isEmpty(), table.name() + " is not a partition table."); int specId = row.get(1, Integer.class); PartitionSpec partitionSpec = table.specs().get(specId); @@ -382,13 +410,14 @@ public IcebergPartition generateIcebergPartition(StructLike row) { } @VisibleForTesting - public Range getPartitionRange(String value, String transform) + public Range getPartitionRange(String value, String transform, List partitionColumns) throws AnalysisException { - // For NULL value, create a lessThan partition for it. + // For NULL value, create a minimum partition for it. if (value == null) { - PartitionKey nullKey = PartitionKey.createPartitionKey( - Lists.newArrayList(new PartitionValue("0000-01-02")), partitionColumns); - return Range.lessThan(nullKey); + PartitionKey nullLowKey = PartitionKey.createPartitionKey( + Lists.newArrayList(new PartitionValue("0000-01-01")), partitionColumns); + PartitionKey nullUpKey = nullLowKey.successor(); + return Range.closedOpen(nullLowKey, nullUpKey); } LocalDateTime epoch = Instant.EPOCH.atZone(ZoneId.of("UTC")).toLocalDateTime(); LocalDateTime target; @@ -525,4 +554,12 @@ public boolean validRelatedTableCache() { public void setIsValidRelatedTableCached(boolean isCached) { this.isValidRelatedTableCached = isCached; } + + private IcebergSnapshotCacheValue getOrFetchSnapshotCacheValue(Optional snapshot) { + if (snapshot.isPresent()) { + return ((IcebergMvccSnapshot) snapshot.get()).getSnapshotCacheValue(); + } else { + return getIcebergSnapshotCacheValue(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index ad347ca78f2a4f..e80a013cc92195 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CacheFactory; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; @@ -49,6 +50,7 @@ public class IcebergMetadataCache { private final LoadingCache> snapshotListCache; private final LoadingCache tableCache; + private final LoadingCache snapshotCache; public IcebergMetadataCache(ExecutorService executor) { CacheFactory snapshotListCacheFactory = new CacheFactory( @@ -66,6 +68,14 @@ public IcebergMetadataCache(ExecutorService executor) { true, null); this.tableCache = tableCacheFactory.buildCache(key -> loadTable(key), null, executor); + + CacheFactory snapshotCacheFactory = new CacheFactory( + OptionalLong.of(28800L), + OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60), + Config.max_external_table_cache_num, + true, + null); + this.snapshotCache = snapshotCacheFactory.buildCache(key -> loadSnapshot(key), null, executor); } public List getSnapshotList(TIcebergMetadataParams params) throws UserException { @@ -92,6 +102,11 @@ public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) { return restTable; } + public IcebergSnapshotCacheValue getSnapshotCache(CatalogIf catalog, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); + return snapshotCache.get(key); + } + @NotNull private List loadSnapshots(IcebergMetadataCacheKey key) { Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName); @@ -114,6 +129,16 @@ private Table loadTable(IcebergMetadataCacheKey key) { () -> ops.loadTable(key.dbName, key.tableName)); } + @NotNull + private IcebergSnapshotCacheValue loadSnapshot(IcebergMetadataCacheKey key) throws AnalysisException { + IcebergExternalTable table = (IcebergExternalTable) key.catalog.getDbOrAnalysisException(key.dbName) + .getTableOrAnalysisException(key.tableName); + long snapshotId = table.getLatestSnapshotId(); + long schemaId = table.getSchemaId(snapshotId); + IcebergPartitionInfo icebergPartitionInfo = table.loadPartitionInfo(snapshotId); + return new IcebergSnapshotCacheValue(icebergPartitionInfo, new IcebergSnapshot(snapshotId, schemaId)); + } + public void invalidateCatalogCache(long catalogId) { snapshotListCache.asMap().keySet().stream() .filter(key -> key.catalog.getId() == catalogId) @@ -125,6 +150,10 @@ public void invalidateCatalogCache(long catalogId) { ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); }); + + snapshotCache.asMap().keySet().stream() + .filter(key -> key.catalog.getId() == catalogId) + .forEach(snapshotCache::invalidate); } public void invalidateTableCache(long catalogId, String dbName, String tblName) { @@ -143,6 +172,11 @@ public void invalidateTableCache(long catalogId, String dbName, String tblName) ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); }); + + snapshotCache.asMap().keySet().stream() + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( + tblName)) + .forEach(snapshotCache::invalidate); } public void invalidateDbCache(long catalogId, String dbName) { @@ -159,6 +193,10 @@ public void invalidateDbCache(long catalogId, String dbName) { ManifestFiles.dropCache(entry.getValue().io()); tableCache.invalidate(entry.getKey()); }); + + snapshotCache.asMap().keySet().stream() + .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName)) + .forEach(snapshotCache::invalidate); } private static void initIcebergTableFileIO(Table table, Map props) { @@ -212,10 +250,12 @@ public int hashCode() { public Map> getCacheStats() { Map> res = Maps.newHashMap(); - res.put("iceberg_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(), + res.put("iceberg_snapshot_list_cache", ExternalMetaCacheMgr.getCacheStats(snapshotListCache.stats(), snapshotListCache.estimatedSize())); res.put("iceberg_table_cache", ExternalMetaCacheMgr.getCacheStats(tableCache.stats(), tableCache.estimatedSize())); + res.put("iceberg_snapshot_cache", ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(), + snapshotCache.estimatedSize())); return res; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java new file mode 100644 index 00000000000000..2c0155a71cd389 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMvccSnapshot.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.mvcc.MvccSnapshot; + +public class IcebergMvccSnapshot implements MvccSnapshot { + private final IcebergSnapshotCacheValue snapshotCacheValue; + + public IcebergMvccSnapshot(IcebergSnapshotCacheValue snapshotCacheValue) { + this.snapshotCacheValue = snapshotCacheValue; + } + + public IcebergSnapshotCacheValue getSnapshotCacheValue() { + return snapshotCacheValue; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java new file mode 100644 index 00000000000000..7931d91831fcec --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheKey.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey; + +import com.google.common.base.Objects; + +public class IcebergSchemaCacheKey extends SchemaCacheKey { + private final long schemaId; + + public IcebergSchemaCacheKey(String dbName, String tableName, long schemaId) { + super(dbName, tableName); + this.schemaId = schemaId; + } + + public long getSchemaId() { + return schemaId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof IcebergSchemaCacheKey)) { + return false; + } + if (!super.equals(o)) { + return false; + } + IcebergSchemaCacheKey that = (IcebergSchemaCacheKey) o; + return schemaId == that.schemaId; + } + + @Override + public int hashCode() { + return Objects.hashCode(super.hashCode(), schemaId); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java index e1fde8049fe1ad..ccfcaab0c7261d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSchemaCacheValue.java @@ -25,26 +25,13 @@ public class IcebergSchemaCacheValue extends SchemaCacheValue { private final List partitionColumns; - private final IcebergPartitionInfo partitionInfo; - private final long snapshotId; - public IcebergSchemaCacheValue(List schema, List partitionColumns, - long snapshotId, IcebergPartitionInfo partitionInfo) { + public IcebergSchemaCacheValue(List schema, List partitionColumns) { super(schema); this.partitionColumns = partitionColumns; - this.snapshotId = snapshotId; - this.partitionInfo = partitionInfo; } public List getPartitionColumns() { return partitionColumns; } - - public IcebergPartitionInfo getPartitionInfo() { - return partitionInfo; - } - - public long getSnapshotId() { - return snapshotId; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshot.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshot.java new file mode 100644 index 00000000000000..5903c362d7434e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshot.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +public class IcebergSnapshot { + private final long snapshotId; + private final long schemaId; + + public IcebergSnapshot(long snapshotId, long schemaId) { + this.snapshotId = snapshotId; + this.schemaId = schemaId; + } + + public long getSnapshotId() { + return snapshotId; + } + + public long getSchemaId() { + return schemaId; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java new file mode 100644 index 00000000000000..95c9a6f26cc5c5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergSnapshotCacheValue.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +public class IcebergSnapshotCacheValue { + + private final IcebergPartitionInfo partitionInfo; + private final IcebergSnapshot snapshot; + + public IcebergSnapshotCacheValue(IcebergPartitionInfo partitionInfo, IcebergSnapshot snapshot) { + this.partitionInfo = partitionInfo; + this.snapshot = snapshot; + } + + public IcebergPartitionInfo getPartitionInfo() { + return partitionInfo; + } + + public IcebergSnapshot getSnapshot() { + return snapshot; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index ba6d628e492c20..a7507fe031ff68 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -52,6 +52,7 @@ import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileFormat; @@ -107,6 +108,8 @@ public Integer initialValue() { // nickname in spark public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec"; + public static final long UNKNOWN_SNAPSHOT_ID = -1; + public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { return null; @@ -573,10 +576,17 @@ private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog /** * Get iceberg schema from catalog and convert them to doris schema */ - public static List getSchema(ExternalCatalog catalog, String dbName, String name) { + public static List getSchema(ExternalCatalog catalog, String dbName, String name, long schemaId) { return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); - Schema schema = icebergTable.schema(); + Schema schema; + if (schemaId == UNKNOWN_SNAPSHOT_ID || icebergTable.currentSnapshot() == null) { + schema = icebergTable.schema(); + } else { + schema = icebergTable.schemas().get((int) schemaId); + } + Preconditions.checkNotNull(schema, + "Schema for table " + catalog.getName() + "." + dbName + "." + name + " is null"); List columns = schema.columns(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (Types.NestedField field : columns) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java index 80d0a7c2429df3..3ba4804e52279c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java @@ -28,17 +28,21 @@ import com.google.common.collect.Maps; import com.google.common.collect.Range; import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; import mockit.Mocked; import mockit.Verifications; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.transforms.Days; import org.apache.iceberg.transforms.Hours; import org.apache.iceberg.transforms.Months; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -52,6 +56,16 @@ public void testIsSupportedPartitionTable(@Mocked org.apache.iceberg.Table icebe @Mocked Schema schema) { IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); Map specs = Maps.newHashMap(); + new MockUp() { + @Mock + private void makeSureInitialized() { + } + + @Mock + public Table getIcebergTable() { + return icebergTable; + } + }; // Test null specs.put(0, null); new Expectations() {{ @@ -139,34 +153,35 @@ public void testGetPartitionRange() throws AnalysisException { table.setPartitionColumns(partitionColumns); // Test null partition value - Range nullRange = table.getPartitionRange(null, "hour"); - Assertions.assertFalse(nullRange.hasLowerBound()); - Assertions.assertEquals("0000-01-02 00:00:00", + Range nullRange = table.getPartitionRange(null, "hour", partitionColumns); + Assertions.assertEquals("0000-01-01 00:00:00", + nullRange.lowerEndpoint().getPartitionValuesAsStringList().get(0)); + Assertions.assertEquals("0000-01-01 00:00:01", nullRange.upperEndpoint().getPartitionValuesAsStringList().get(0)); // Test hour transform. - Range hour = table.getPartitionRange("100", "hour"); + Range hour = table.getPartitionRange("100", "hour", partitionColumns); PartitionKey lowKey = hour.lowerEndpoint(); PartitionKey upKey = hour.upperEndpoint(); Assertions.assertEquals("1970-01-05 04:00:00", lowKey.getPartitionValuesAsStringList().get(0)); Assertions.assertEquals("1970-01-05 05:00:00", upKey.getPartitionValuesAsStringList().get(0)); // Test day transform. - Range day = table.getPartitionRange("100", "day"); + Range day = table.getPartitionRange("100", "day", partitionColumns); lowKey = day.lowerEndpoint(); upKey = day.upperEndpoint(); Assertions.assertEquals("1970-04-11 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); Assertions.assertEquals("1970-04-12 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); // Test month transform. - Range month = table.getPartitionRange("100", "month"); + Range month = table.getPartitionRange("100", "month", partitionColumns); lowKey = month.lowerEndpoint(); upKey = month.upperEndpoint(); Assertions.assertEquals("1978-05-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); Assertions.assertEquals("1978-06-01 00:00:00", upKey.getPartitionValuesAsStringList().get(0)); // Test year transform. - Range year = table.getPartitionRange("100", "year"); + Range year = table.getPartitionRange("100", "year", partitionColumns); lowKey = year.lowerEndpoint(); upKey = year.upperEndpoint(); Assertions.assertEquals("2070-01-01 00:00:00", lowKey.getPartitionValuesAsStringList().get(0)); @@ -174,7 +189,7 @@ public void testGetPartitionRange() throws AnalysisException { // Test unsupported transform Exception exception = Assertions.assertThrows(RuntimeException.class, () -> { - table.getPartitionRange("100", "bucket"); + table.getPartitionRange("100", "bucket", partitionColumns); }); Assertions.assertEquals("Unsupported transform bucket", exception.getMessage()); } @@ -183,15 +198,16 @@ public void testGetPartitionRange() throws AnalysisException { public void testSortRange() throws AnalysisException { IcebergExternalTable table = new IcebergExternalTable(1, "1", "2", null); Column c = new Column("c", PrimitiveType.DATETIMEV2); + ArrayList columns = Lists.newArrayList(c); table.setPartitionColumns(Lists.newArrayList(c)); - PartitionItem nullRange = new RangePartitionItem(table.getPartitionRange(null, "hour")); - PartitionItem year1970 = new RangePartitionItem(table.getPartitionRange("0", "year")); - PartitionItem year1971 = new RangePartitionItem(table.getPartitionRange("1", "year")); - PartitionItem month197002 = new RangePartitionItem(table.getPartitionRange("1", "month")); - PartitionItem month197103 = new RangePartitionItem(table.getPartitionRange("14", "month")); - PartitionItem month197204 = new RangePartitionItem(table.getPartitionRange("27", "month")); - PartitionItem day19700202 = new RangePartitionItem(table.getPartitionRange("32", "day")); - PartitionItem day19730101 = new RangePartitionItem(table.getPartitionRange("1096", "day")); + PartitionItem nullRange = new RangePartitionItem(table.getPartitionRange(null, "hour", columns)); + PartitionItem year1970 = new RangePartitionItem(table.getPartitionRange("0", "year", columns)); + PartitionItem year1971 = new RangePartitionItem(table.getPartitionRange("1", "year", columns)); + PartitionItem month197002 = new RangePartitionItem(table.getPartitionRange("1", "month", columns)); + PartitionItem month197103 = new RangePartitionItem(table.getPartitionRange("14", "month", columns)); + PartitionItem month197204 = new RangePartitionItem(table.getPartitionRange("27", "month", columns)); + PartitionItem day19700202 = new RangePartitionItem(table.getPartitionRange("32", "day", columns)); + PartitionItem day19730101 = new RangePartitionItem(table.getPartitionRange("1096", "day", columns)); Map map = Maps.newHashMap(); map.put("nullRange", nullRange); map.put("year1970", year1970); diff --git a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out index c9d9799da81300..483ac0957e6f67 100644 --- a/regression-test/data/mtmv_p0/test_iceberg_mtmv.out +++ b/regression-test/data/mtmv_p0/test_iceberg_mtmv.out @@ -103,3 +103,18 @@ 2024-09-30 6 2024-10-28 7 +-- !refresh_one_partition -- +2024-01-01T00:00 4 + +-- !refresh_one_partition_rewrite -- +2024-01-01T00:00 4 +2024-01-02T00:00 3 + +-- !refresh_auto -- +2024-01-01T00:00 4 +2024-01-02T00:00 3 + +-- !refresh_all_partition_rewrite -- +2024-01-01T00:00 4 +2024-01-02T00:00 3 + diff --git a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy index 59cf1173acb46b..aee80d8d1693a4 100644 --- a/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy +++ b/regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy @@ -83,6 +83,7 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ String icebergDb = "iceberg_mtmv_partition" String icebergTable1 = "tstable" String icebergTable2 = "dtable" + String icebergTable3 = "union_test" sql """drop catalog if exists ${catalog_name} """ sql """create catalog if not exists ${catalog_name} properties ( 'type'='iceberg', @@ -210,6 +211,61 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_ sql """drop materialized view if exists ${mvName2};""" sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable2}""" + // Test rewrite and union partitions + sql """set materialized_view_rewrite_enable_contain_external_table=true;""" + String mvSql = "SELECT par,count(*) as num FROM ${catalog_name}.${icebergDb}.${icebergTable3} group by par" + String mvName = "union_mv" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable3}""" + sql """ + CREATE TABLE ${catalog_name}.${icebergDb}.${icebergTable3} ( + id int, + value int, + par datetime + ) ENGINE=iceberg + PARTITION BY LIST (day(par)) (); + """ + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable3} values (1, 1, "2024-01-01"), (2, 1, "2024-01-01"), (3, 1, "2024-01-01"), (4, 1, "2024-01-01")""" + sql """insert into ${catalog_name}.${icebergDb}.${icebergTable3} values (1, 2, "2024-01-02"), (2, 2, "2024-01-02"), (3, 2, "2024-01-02")""" + sql """analyze table ${catalog_name}.${icebergDb}.${icebergTable3} with sync""" + + sql """drop materialized view if exists ${mvName};""" + sql """ + CREATE MATERIALIZED VIEW ${mvName} + BUILD DEFERRED REFRESH AUTO ON MANUAL + partition by(`par`) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ('replication_num' = '1') + AS ${mvSql} + """ + + def showPartitions = sql """show partitions from ${mvName}""" + logger.info("showPartitions: " + showPartitions.toString()) + assertTrue(showPartitions.toString().contains("p_20240101000000_20240102000000")) + assertTrue(showPartitions.toString().contains("p_20240102000000_20240103000000")) + + // refresh one partiton + sql """REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20240101000000_20240102000000);""" + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_one_partition "SELECT * FROM ${mvName} " + def explainOnePartition = sql """ explain ${mvSql} """ + logger.info("explainOnePartition: " + explainOnePartition.toString()) + assertTrue(explainOnePartition.toString().contains("VUNION")) + order_qt_refresh_one_partition_rewrite "${mvSql}" + mv_rewrite_success("${mvSql}", "${mvName}") + + //refresh auto + sql """REFRESH MATERIALIZED VIEW ${mvName} auto""" + waitingMTMVTaskFinishedByMvName(mvName) + order_qt_refresh_auto "SELECT * FROM ${mvName} " + def explainAllPartition = sql """ explain ${mvSql}; """ + logger.info("explainAllPartition: " + explainAllPartition.toString()) + assertTrue(explainAllPartition.toString().contains("VOlapScanNode")) + order_qt_refresh_all_partition_rewrite "${mvSql}" + mv_rewrite_success("${mvSql}", "${mvName}") + + sql """drop materialized view if exists ${mvName};""" + sql """drop table if exists ${catalog_name}.${icebergDb}.${icebergTable3}""" + sql """ drop catalog if exists ${catalog_name} """ } }