Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](mtmv)Mv paimon refresh2 #44345

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource;

import org.apache.doris.catalog.TableIf;

public interface MvccTable extends TableIf {
long getLatestSnapshotId();

void ref(long snapshotId);

void unref(long snapshotId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,23 @@
package org.apache.doris.datasource.paimon;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.MvccTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
Expand All @@ -30,25 +43,37 @@
import org.apache.doris.thrift.TTableType;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.system.PartitionsTable;
import org.apache.paimon.table.system.SnapshotsTable;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class PaimonExternalTable extends ExternalTable {
public class PaimonExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf, MvccTable {

private static final Logger LOG = LogManager.getLogger(PaimonExternalTable.class);

Expand All @@ -67,10 +92,78 @@ protected synchronized void makeSureInitialized() {
}
}

public Table getPaimonTable() {
public Table getPaimonTable(long snapshotId) {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue) value).getPaimonTable()).orElse(null);
if (!schemaCacheValue.isPresent()) {
return null;
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPaimonTable().copy(
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), String.valueOf(snapshotId)));
}

private PaimonPartitionInfo getPartitionInfoFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return new PaimonPartitionInfo();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionInfo();
}

private List<Column> getPartitionColumnsFromCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
return Lists.newArrayList();
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getPartitionColumns();
}

public long getLatestSnapshotIdFromCache() throws AnalysisException {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (!schemaCacheValue.isPresent()) {
throw new AnalysisException("not present");
}
return ((PaimonSchemaCacheValue) schemaCacheValue.get()).getSnapshootId();
}

// need ref/unref
public Map<String, PartitionItem> getPartitions(long snapshotId) throws DdlException {
return getSchemaCacheBySnapshotId(snapshotId).getPartitionInfo().getNameToPartitionItem();
}

private PaimonSchemaCacheValue getSchemaCacheBySnapshotId(long snapshotId) throws DdlException {
return PaimonSnapshotCache.getSchemaCacheBySnapshotId(this, snapshotId);
}

public Optional<PaimonSchemaCacheValue> getLatestSchemaCache() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
if (schemaCacheValue.isPresent()) {
return Optional.of((PaimonSchemaCacheValue) schemaCacheValue.get());
} else {
return Optional.empty();
}
}

@Override
public long getLatestSnapshotId() {
try {
return getLatestSnapshotIdFromCache();
} catch (AnalysisException e) {
return -1L;
}
}

@Override
public void ref(long snapshotId) {
PaimonSnapshotCache.ref(this, snapshotId);
}

public void unref(long snapshotId) {
PaimonSnapshotCache.unref(this, snapshotId);
}

@Override
Expand All @@ -79,12 +172,62 @@ public Optional<SchemaCacheValue> initSchema() {
TableSchema schema = ((FileStoreTable) paimonTable).schema();
List<DataField> columns = schema.fields();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
Set<String> partitionColumnNames = Sets.newHashSet(paimonTable.partitionKeys());
List<Column> partitionColumns = Lists.newArrayList();
for (DataField field : columns) {
tmpSchema.add(new Column(field.name().toLowerCase(),
Column column = new Column(field.name().toLowerCase(),
paimonTypeToDorisType(field.type()), true, null, true, field.description(), true,
field.id()));
field.id());
tmpSchema.add(column);
if (partitionColumnNames.contains(field.name())) {
partitionColumns.add(column);
}
}
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
try {
// after 0.9.0 paimon will support table.getLatestSnapshotId()
long latestSnapshotId = loadLatestSnapshotId();
PaimonPartitionInfo partitionInfo = loadPartitionInfo(partitionColumns);
return Optional.of(new PaimonSchemaCacheValue(tmpSchema, partitionColumns, paimonTable, latestSnapshotId,
partitionInfo));
} catch (IOException | AnalysisException e) {
LOG.warn(e);
return Optional.empty();
}
}

private long loadLatestSnapshotId() throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + SnapshotsTable.SNAPSHOTS);
// snapshotId
List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}});
long latestSnapshotId = 0L;
for (InternalRow row : rows) {
long snapshotId = row.getLong(0);
if (snapshotId > latestSnapshotId) {
latestSnapshotId = snapshotId;
}
}
return latestSnapshotId;
}

private PaimonPartitionInfo loadPartitionInfo(List<Column> partitionColumns) throws IOException, AnalysisException {
if (CollectionUtils.isEmpty(partitionColumns)) {
return new PaimonPartitionInfo();
}
List<PaimonPartition> paimonPartitions = loadPartitions();
return PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
}

private List<PaimonPartition> loadPartitions()
throws IOException {
Table table = ((PaimonExternalCatalog) catalog).getPaimonTable(dbName,
name + Catalog.SYSTEM_TABLE_SPLITTER + PartitionsTable.PARTITIONS);
List<InternalRow> rows = PaimonUtil.read(table, null);
List<PaimonPartition> res = Lists.newArrayListWithCapacity(rows.size());
for (InternalRow row : rows) {
res.add(PaimonUtil.rowToPartition(row));
}
return res;
}

private Type paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
Expand Down Expand Up @@ -205,4 +348,56 @@ public long fetchRowCount() {
}
return rowCount > 0 ? rowCount : UNKNOWN_ROW_COUNT;
}

@Override
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
Env.getCurrentEnv().getRefreshManager()
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
}

@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
return Maps.newHashMap(getPartitionInfoFromCache().getNameToPartitionItem());
}

@Override
public PartitionType getPartitionType() {
return getPartitionColumnsFromCache().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}

@Override
public Set<String> getPartitionColumnNames() {
return getPartitionColumnsFromCache().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}

@Override
public List<Column> getPartitionColumns() {
return getPartitionColumnsFromCache();
}

@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
PaimonPartition paimonPartition = getPartitionInfoFromCache().getNameToPartition().get(partitionName);
if (paimonPartition == null) {
throw new AnalysisException("can not find partition: " + partitionName);
}
return new MTMVTimestampSnapshot(paimonPartition.getLastUpdateTime());
}

@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
return new MTMVVersionSnapshot(getLatestSnapshotIdFromCache());
}

@Override
public boolean isPartitionColumnAllowNull() {
// Paimon will write to the 'null' partition regardless of whether it is' null or 'null'.
// The logic is inconsistent with Doris' empty partition logic, so it needs to return false.
// However, when Spark creates Paimon tables, specifying 'not null' does not take effect.
// In order to successfully create the materialized view, false is returned here.
// The cost is that Paimon partition writes a null value, and the materialized view cannot detect this data.
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.paimon;

// https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
public class PaimonPartition {
// Partition values, for example: [1, dd]
private final String partitionValues;
// The amount of data in the partition
private final long recordCount;
// Partition file size
private final long fileSizeInBytes;
// Number of partition files
private final long fileCount;
// Last update time of partition
private final long lastUpdateTime;

public PaimonPartition(String partitionValues, long recordCount, long fileSizeInBytes, long fileCount,
long lastUpdateTime) {
this.partitionValues = partitionValues;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.fileCount = fileCount;
this.lastUpdateTime = lastUpdateTime;
}

public String getPartitionValues() {
return partitionValues;
}

public long getRecordCount() {
return recordCount;
}

public long getFileSizeInBytes() {
return fileSizeInBytes;
}

public long getFileCount() {
return fileCount;
}

public long getLastUpdateTime() {
return lastUpdateTime;
}
}
Loading
Loading