From dffdb0d685643be548afa03abd575b310395e93c Mon Sep 17 00:00:00 2001 From: Vallish Date: Fri, 1 Nov 2024 13:12:28 +0000 Subject: [PATCH] [Enhancement] Support move truncated old data to recycle bin --- .../org/apache/doris/nereids/DorisParser.g4 | 2 +- fe/fe-core/src/main/cup/sql_parser.cup | 4 +- .../doris/analysis/TruncateTableStmt.java | 8 +- .../org/apache/doris/catalog/OlapTable.java | 88 ++++++++++++++++++- .../doris/catalog/RecyclePartitionParam.java | 31 +++++++ .../doris/datasource/InternalCatalog.java | 72 ++++++--------- .../doris/persist/TruncateTableInfo.java | 10 ++- .../test_truncate_recover.out | 24 +++++ .../test_truncate_recover_list.out | 20 +++++ .../test_truncate_recover_no_partition.out | 15 ++++ ...sert_overwrite_recover_no_partition.groovy | 4 +- .../test_truncate_recover.groovy | 74 ++++++++++++++++ .../test_truncate_recover_list.groovy | 57 ++++++++++++ .../test_truncate_recover_no_partition.groovy | 79 +++++++++++++++++ 14 files changed, 433 insertions(+), 55 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/RecyclePartitionParam.java create mode 100644 regression-test/data/catalog_recycle_bin_p0/test_truncate_recover.out create mode 100644 regression-test/data/catalog_recycle_bin_p0/test_truncate_recover_list.out create mode 100644 regression-test/data/catalog_recycle_bin_p0/test_truncate_recover_no_partition.out create mode 100644 regression-test/suites/catalog_recycle_bin_p0/test_truncate_recover.groovy create mode 100644 regression-test/suites/catalog_recycle_bin_p0/test_truncate_recover_list.groovy create mode 100644 regression-test/suites/catalog_recycle_bin_p0/test_truncate_recover_no_partition.groovy diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 80da53a51bde331..d263b1f8bf4f3c0 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -851,7 +851,7 @@ unsupportedUseStatement ; unsupportedDmlStatement - : TRUNCATE TABLE multipartIdentifier specifiedPartition? #truncateTable + : TRUNCATE TABLE multipartIdentifier specifiedPartition? FORCE? #truncateTable | COPY INTO name=multipartIdentifier columns=identifierList? FROM (stageAndPattern | (LEFT_PAREN SELECT selectColumnClause FROM stageAndPattern whereClause? RIGHT_PAREN)) diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index dca7752761c0a49..d6aba3fe378e141 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -7976,9 +7976,9 @@ admin_stmt ::= ; truncate_stmt ::= - KW_TRUNCATE KW_TABLE base_table_ref:tblRef + KW_TRUNCATE KW_TABLE base_table_ref:tblRef opt_force:force {: - RESULT = new TruncateTableStmt(tblRef); + RESULT = new TruncateTableStmt(tblRef, force); :} ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TruncateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TruncateTableStmt.java index 9543ff1853b742b..7686fb663338b5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TruncateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TruncateTableStmt.java @@ -30,15 +30,21 @@ public class TruncateTableStmt extends DdlStmt implements NotFallbackInParser { private TableRef tblRef; + private boolean forceDrop; - public TruncateTableStmt(TableRef tblRef) { + public TruncateTableStmt(TableRef tblRef, boolean forceDrop) { this.tblRef = tblRef; + this.forceDrop = forceDrop; } public TableRef getTblRef() { return tblRef; } + public boolean isForceDrop() { + return forceDrop; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException, UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 999e0c43995f00a..4c80a0f65140816 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1196,6 +1196,75 @@ public Partition dropPartition(long dbId, String partitionName, boolean isForceD return dropPartition(dbId, partitionName, isForceDrop, !isForceDrop); } + public Partition dropPartitionForTruncate(long dbId, boolean isForceDrop, + RecyclePartitionParam recyclePartitionParam) { + // 1. If "isForceDrop" is false, the partition will be added to the Catalog Recyle bin, and all tablets of this + // partition will not be deleted. + // 2. If "ifForceDrop" is true, the partition will be dropped immediately + Partition partition = recyclePartitionParam.partition; + if (partition != null) { + idToPartition.remove(partition.getId()); + + if (!isForceDrop) { + // recycle partition + if (partitionInfo.getType() == PartitionType.RANGE) { + Env.getCurrentRecycleBin().recyclePartition(dbId, id, name, partition, + recyclePartitionParam.partitionItem.getItems(), + new ListPartitionItem(Lists.newArrayList(new PartitionKey())), + recyclePartitionParam.dataProperty, + recyclePartitionParam.replicaAlloc, + recyclePartitionParam.isInMemory, + recyclePartitionParam.isMutable); + + } else if (partitionInfo.getType() == PartitionType.LIST) { + // construct a dummy range + List dummyColumns = new ArrayList<>(); + dummyColumns.add(new Column("dummy", PrimitiveType.INT)); + PartitionKey dummyKey = null; + try { + dummyKey = PartitionKey.createInfinityPartitionKey(dummyColumns, false); + } catch (AnalysisException e) { + LOG.warn("should not happen", e); + } + Range dummyRange = Range.open(new PartitionKey(), dummyKey); + + Env.getCurrentRecycleBin().recyclePartition(dbId, id, name, partition, + dummyRange, + recyclePartitionParam.partitionItem, + recyclePartitionParam.dataProperty, + recyclePartitionParam.replicaAlloc, + recyclePartitionParam.isInMemory, + recyclePartitionParam.isMutable); + } else { + // unpartition + // construct a dummy range and dummy list. + List dummyColumns = new ArrayList<>(); + dummyColumns.add(new Column("dummy", PrimitiveType.INT)); + PartitionKey dummyKey = null; + try { + dummyKey = PartitionKey.createInfinityPartitionKey(dummyColumns, false); + } catch (AnalysisException e) { + LOG.warn("should not happen", e); + } + Range dummyRange = Range.open(new PartitionKey(), dummyKey); + Env.getCurrentRecycleBin().recyclePartition(dbId, id, name, partition, + dummyRange, + new ListPartitionItem(Lists.newArrayList(new PartitionKey())), + recyclePartitionParam.dataProperty, + recyclePartitionParam.replicaAlloc, + recyclePartitionParam.isInMemory, + recyclePartitionParam.isMutable); + } + } else { + Env.getCurrentEnv().onErasePartition(partition); + } + + // drop partition info + partitionInfo.dropPartition(partition.getId()); + } + return partition; + } + /* * A table may contain both formal and temporary partitions. * There are several methods to get the partition of a table. @@ -2032,13 +2101,24 @@ public static OlapTable read(DataInput in) throws IOException { return GsonUtils.GSON.fromJson(Text.readString(in), OlapTable.class); } + + public void fillInfo(Partition partition, RecyclePartitionParam recyclePartitionParam) { + recyclePartitionParam.dataProperty = partitionInfo.getDataProperty(partition.getId()); + recyclePartitionParam.replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId()); + recyclePartitionParam.isInMemory = partitionInfo.getIsInMemory(partition.getId()); + recyclePartitionParam.isMutable = partitionInfo.getIsMutable(partition.getId()); + recyclePartitionParam.partitionItem = partitionInfo.getItem(partition.getId()); + recyclePartitionParam.partition = partition; + } + /* * this method is currently used for truncating table(partitions). * the new partition has new id, so we need to change all 'id-related' members * * return the old partition. */ - public Partition replacePartition(Partition newPartition) { + public Partition replacePartition(Partition newPartition, + RecyclePartitionParam recyclePartitionParam) { Partition oldPartition = nameToPartition.remove(newPartition.getName()); idToPartition.remove(oldPartition.getId()); @@ -2049,6 +2129,12 @@ public Partition replacePartition(Partition newPartition) { ReplicaAllocation replicaAlloc = partitionInfo.getReplicaAllocation(oldPartition.getId()); boolean isInMemory = partitionInfo.getIsInMemory(oldPartition.getId()); boolean isMutable = partitionInfo.getIsMutable(oldPartition.getId()); + recyclePartitionParam.dataProperty = dataProperty; + recyclePartitionParam.replicaAlloc = replicaAlloc; + recyclePartitionParam.isInMemory = isInMemory; + recyclePartitionParam.isMutable = isMutable; + recyclePartitionParam.partitionItem = partitionInfo.getItem(oldPartition.getId()); + recyclePartitionParam.partition = oldPartition; if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecyclePartitionParam.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecyclePartitionParam.java new file mode 100644 index 000000000000000..51f38638938db5b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecyclePartitionParam.java @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +public class RecyclePartitionParam { + public Partition partition; + public PartitionItem partitionItem; + public DataProperty dataProperty; + public ReplicaAllocation replicaAlloc; + public boolean isInMemory; + public boolean isMutable = true; + + public RecyclePartitionParam() { + // do nothing. + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index f8183028c6acd62..8a180f7cd215db5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -103,6 +103,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.catalog.RecyclePartitionParam; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Replica.ReplicaState; import org.apache.doris.catalog.ReplicaAllocation; @@ -3702,13 +3703,17 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti throw new DdlException("Table[" + copiedTbl.getName() + "]'s meta has been changed. try again."); } - // replace - oldPartitions = truncateTableInternal(olapTable, newPartitions, truncateEntireTable); + boolean isForceDrop = truncateTableStmt.isForceDrop(); + //replace + Map recyclePartitionParamMap = new HashMap<>(); + oldPartitions = truncateTableInternal(olapTable, newPartitions, + truncateEntireTable, recyclePartitionParamMap, isForceDrop); // write edit log TruncateTableInfo info = new TruncateTableInfo(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(), - newPartitions, truncateEntireTable, truncateTableStmt.toSqlWithoutTable(), oldPartitions); + newPartitions, truncateEntireTable, + truncateTableStmt.toSqlWithoutTable(), oldPartitions, isForceDrop); Env.getCurrentEnv().getEditLog().logTruncateTable(info); } catch (DdlException e) { failedCleanCallback.run(); @@ -3719,8 +3724,6 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } } - erasePartitionDropBackendReplicas(oldPartitions); - PartitionNames partitionNames = truncateEntireTable ? null : new PartitionNames(false, tblRef.getPartitionNames().getPartitionNames()); Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable, partitionNames); @@ -3729,76 +3732,51 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } private List truncateTableInternal(OlapTable olapTable, List newPartitions, - boolean isEntireTable) { + boolean isEntireTable, Map recyclePartitionParamMap, boolean isforceDrop) { // use new partitions to replace the old ones. List oldPartitions = Lists.newArrayList(); - Set oldTabletIds = Sets.newHashSet(); for (Partition newPartition : newPartitions) { - Partition oldPartition = olapTable.replacePartition(newPartition); + RecyclePartitionParam recyclePartitionParam = new RecyclePartitionParam(); + Partition oldPartition = olapTable.replacePartition(newPartition, recyclePartitionParam); oldPartitions.add(oldPartition); - // save old tablets to be removed - for (MaterializedIndex index : oldPartition.getMaterializedIndices(IndexExtState.ALL)) { - index.getTablets().forEach(t -> { - oldTabletIds.add(t.getId()); - }); - } + recyclePartitionParamMap.put(oldPartition.getId(), recyclePartitionParam); } if (isEntireTable) { Set oldPartitionsIds = oldPartitions.stream().map(Partition::getId).collect(Collectors.toSet()); for (Partition partition : olapTable.getAllTempPartitions()) { if (!oldPartitionsIds.contains(partition.getId())) { + RecyclePartitionParam recyclePartitionParam = new RecyclePartitionParam(); + olapTable.fillInfo(partition, recyclePartitionParam); oldPartitions.add(partition); + recyclePartitionParamMap.put(partition.getId(), recyclePartitionParam); + // clear temp partition from memory. + // tablet may be moved to recycle bin or deleted inside + // dropPartitionForTruncate function. + olapTable.dropTempPartition(partition.getName(), false); } } - // drop all temp partitions - olapTable.dropAllTempPartitions(); } - // remove the tablets in old partitions - for (Long tabletId : oldTabletIds) { - Env.getCurrentInvertedIndex().deleteTablet(tabletId); + for (Map.Entry pair : recyclePartitionParamMap.entrySet()) { + olapTable.dropPartitionForTruncate(olapTable.getDatabase().getId(), isforceDrop, pair.getValue()); } return oldPartitions; } public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException { - List oldPartitions = Lists.newArrayList(); + boolean isForceDrop = info.getForce(); Database db = (Database) getDbOrMetaException(info.getDbId()); OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTblId(), TableType.OLAP); olapTable.writeLock(); try { - truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable()); - - // add tablet to inverted index - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (Partition partition : info.getPartitions()) { - oldPartitions.add(partition); - long partitionId = partition.getId(); - TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) - .getStorageMedium(); - for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) { - long indexId = mIndex.getId(); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - for (Tablet tablet : mIndex.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId, - schemaHash, medium); - long tabletId = tablet.getId(); - invertedIndex.addTablet(tabletId, tabletMeta); - for (Replica replica : tablet.getReplicas()) { - invertedIndex.addReplica(tabletId, replica); - } - } - } - } + Map recyclePartitionParamMap = new HashMap<>(); + truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable(), + recyclePartitionParamMap, isForceDrop); } finally { olapTable.writeUnlock(); } - - if (!Env.isCheckpointThread()) { - erasePartitionDropBackendReplicas(oldPartitions); - } } public void replayAlterExternalTableSchema(String dbName, String tableName, List newSchema) diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java index b252b2a38233f73..ffb59d2afdffd9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/TruncateTableInfo.java @@ -49,13 +49,15 @@ public class TruncateTableInfo implements Writable { private String rawSql = ""; @SerializedName(value = "op") private Map oldPartitions = new HashMap<>(); + @SerializedName(value = "force") + private boolean force = true; // older version it was forced always. public TruncateTableInfo() { } public TruncateTableInfo(long dbId, String db, long tblId, String table, List partitions, - boolean isEntireTable, String rawSql, List oldPartitions) { + boolean isEntireTable, String rawSql, List oldPartitions, boolean force) { this.dbId = dbId; this.db = db; this.tblId = tblId; @@ -66,6 +68,7 @@ public TruncateTableInfo(long dbId, String db, long tblId, String table, List