Skip to content

Commit

Permalink
add partition check
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde committed Oct 15, 2024
1 parent 9000b20 commit 198e290
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static Pair<Boolean, String> compareDynamicPartition(OlapTable originalTa
@VisibleForTesting
public static Pair<Boolean, String> compareAutoPartition(OlapTable originalTable,
OlapTable relatedTable) throws AnalysisException {
if (!isDynamicPartition(relatedTable)) {
if (!isAutoPartition(relatedTable)) {
return Pair.of(false, "relatedTable is not dynamic partition.");
}
FunctionIntervalInfo originalFunctionIntervalInfo = PartitionExprUtil.getFunctionIntervalInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPartitionCheckUtil;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.NereidsPlanner;
Expand Down Expand Up @@ -199,32 +201,74 @@ public static RelatedTableInfo getRelatedTableInfos(String column, String timeUn
"partition rollup expressions is not consistent, partition rollup expressions map is %s",
checkContext.getPartitionAndRollupExpressionChecked()));
}
List<TableColumnInfo> tableColumnInfos = new ArrayList<>();
List<TableIf> derivedRelatedTables = new ArrayList<>();
TableIf originalRelatedTable = null;
Map<TableIf, TableColumnInfo> tableAndPartitionMap = new HashMap<>();
Set<DataType> dataTypeSet = new HashSet<>();
for (Map.Entry<SlotReference, Pair<Optional<Expression>, Boolean>> entry
: checkContext.getPartitionAndRollupExpressionChecked().entrySet()) {
SlotReference partitionColumn = entry.getKey();
Column relatedColumn = extractColumn(partitionColumn);
if (relatedColumn == null) {
// Partition column is not from table
continue;
}
dataTypeSet.add(partitionColumn.getDataType());
if (dataTypeSet.size() > 1) {
return RelatedTableInfo.failWith(String.format(
"multi partition column data types are different, data type are %s", dataTypeSet));
}
if (!partitionColumn.isColumnFromTable()) {
return RelatedTableInfo.failWith(String.format(
"partition checked is not from table, partition rollup expressions map is %s",
checkContext.getPartitionAndRollupExpressionChecked()));
TableIf relatedTable = partitionColumn.getTable().get();
tableAndPartitionMap.put(relatedTable,
new TableColumnInfo(partitionColumn.getTable().map(BaseTableInfo::new).get(),
relatedColumn.getName(),
entry.getValue().key().orElse(null),
entry.getValue().value()));
if (entry.getValue().value()) {
originalRelatedTable = relatedTable;
} else {
derivedRelatedTables.add(relatedTable);
}
Column relatedColumn = extractColumn(partitionColumn);
tableColumnInfos.add(new TableColumnInfo(partitionColumn.getTable().map(BaseTableInfo::new).get(),
relatedColumn.getName(),
entry.getValue().key().orElse(null),
entry.getValue().value()));
}
if (!tableColumnInfos.isEmpty()) {
return RelatedTableInfo.successWith(tableColumnInfos);
if (originalRelatedTable == null) {
return RelatedTableInfo.failWith(String.format(
"original related table is null, err info is %s, partition checked expressions map is %s",
checkContext.getFailReasons(), checkContext.getPartitionAndRollupExpressionChecked()));
}
// If derivedRelatedTables is not empty, check partition between original and derived table is valid or not
Pair<Boolean, String> checkResult = MTMVPartitionCheckUtil.checkIfAllowMultiTablePartitionRefresh(
(MTMVRelatedTableIf) originalRelatedTable);
// TODO: 2024/10/15 Support external table in the future
TableColumnInfo originalTableColumnInfo = tableAndPartitionMap.get(originalRelatedTable);
if (!checkResult.key() || !(originalRelatedTable instanceof OlapTable)) {
// If original related table is invalid or is external table, degrade into single input trigger refresh
return RelatedTableInfo.successWith(ImmutableList.of(originalTableColumnInfo));
}
// TODO: 2024/10/15 Support external table in the future
derivedRelatedTables = derivedRelatedTables.stream()
.filter(tableIf -> MTMVPartitionCheckUtil.checkIfAllowMultiTablePartitionRefresh(
(MTMVRelatedTableIf) tableIf).key() && tableIf instanceof OlapTable)
.collect(Collectors.toList());
if (derivedRelatedTables.isEmpty()) {
// If valid derivedRelatedTables is empty, degrade into single input trigger refresh
return RelatedTableInfo.successWith(ImmutableList.of(originalTableColumnInfo));
}
List<TableColumnInfo> checkedTableColumnInfos = new ArrayList<>();
checkedTableColumnInfos.add(originalTableColumnInfo);
for (TableIf derivedTable : derivedRelatedTables) {
try {
// check the original related table partition is same with each derived related table
Pair<Boolean, String> derivedCheckResult = MTMVPartitionCheckUtil.compareOriginalTableAndRelatedTable(
(OlapTable) originalRelatedTable,
(OlapTable) derivedTable);
if (derivedCheckResult.key() && tableAndPartitionMap.get(derivedTable) != null) {
checkedTableColumnInfos.add(tableAndPartitionMap.get(derivedTable));
}
} catch (AnalysisException e) {
// noting
}
}
return RelatedTableInfo.failWith("can't not find valid partition track column finally, "
+ checkContext.getFailReasons());
return RelatedTableInfo.successWith(checkedTableColumnInfos);
}

private static boolean checkRollupExpression(Collection<Pair<Optional<Expression>, Boolean>> rollupExpressions) {
Expand Down Expand Up @@ -839,6 +883,10 @@ private static Column extractColumn(SlotReference slotReference) {
if (!slotReferenceColumn.isPresent()) {
return null;
}
if (!slotReference.isColumnFromTable()) {
// Column is not from table
return null;
}
Expr definExpr = slotReferenceColumn.get().getDefineExpr();
if (definExpr instanceof SlotRef) {
// If slotReference is from sync mv when rbo, should get actual column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,49 @@ protected void runBeforeAll() throws Exception {
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ")");

createTable("CREATE TABLE IF NOT EXISTS lineitem_auto (\n"
+ " L_ORDERKEY INTEGER NOT NULL,\n"
+ " L_PARTKEY INTEGER NOT NULL,\n"
+ " L_SUPPKEY INTEGER NOT NULL,\n"
+ " L_LINENUMBER INTEGER NOT NULL,\n"
+ " L_QUANTITY DECIMALV3(15,2) NOT NULL,\n"
+ " L_EXTENDEDPRICE DECIMALV3(15,2) NOT NULL,\n"
+ " L_DISCOUNT DECIMALV3(15,2) NOT NULL,\n"
+ " L_TAX DECIMALV3(15,2) NOT NULL,\n"
+ " L_RETURNFLAG CHAR(1) NOT NULL,\n"
+ " L_LINESTATUS CHAR(1) NOT NULL,\n"
+ " L_SHIPDATE DATE NOT NULL,\n"
+ " L_COMMITDATE DATE NOT NULL,\n"
+ " L_RECEIPTDATE DATE NOT NULL,\n"
+ " L_SHIPINSTRUCT CHAR(25) NOT NULL,\n"
+ " L_SHIPMODE CHAR(10) NOT NULL,\n"
+ " L_COMMENT VARCHAR(44) NOT NULL\n"
+ ")\n"
+ "DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)\n"
+ "auto partition by range (date_trunc(`L_SHIPDATE`, 'day')) ()\n"
+ "DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ")");

createTable("CREATE TABLE IF NOT EXISTS orders_auto (\n"
+ " O_ORDERKEY INTEGER NOT NULL,\n"
+ " O_CUSTKEY INTEGER NOT NULL,\n"
+ " O_ORDERSTATUS CHAR(1) NOT NULL,\n"
+ " O_TOTALPRICE DECIMALV3(15,2) NOT NULL,\n"
+ " O_ORDERDATE DATE NOT NULL,\n"
+ " O_ORDERPRIORITY CHAR(15) NOT NULL, \n"
+ " O_CLERK CHAR(15) NOT NULL, \n"
+ " O_SHIPPRIORITY INTEGER NOT NULL,\n"
+ " O_COMMENT VARCHAR(79) NOT NULL\n"
+ ")\n"
+ "DUPLICATE KEY(O_ORDERKEY, O_CUSTKEY)\n"
+ "auto partition by range (date_trunc(`O_ORDERDATE`, 'day')) ()\n"
+ "DISTRIBUTED BY HASH(O_ORDERKEY) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ " \"replication_num\" = \"1\"\n"
+ ")");
// Should not make scan to empty relation when the table used by materialized view has no data
connectContext.getSessionVariable().setDisableNereidsRules("OLAP_SCAN_PARTITION_PRUNE,PRUNE_EMPTY_PARTITION");
}
Expand Down Expand Up @@ -198,8 +241,26 @@ public void test5() {
MaterializedViewUtils.getRelatedTableInfos("l_shipdate", null,
rewrittenPlan, nereidsPlanner.getCascadesContext());
successWith(relatedTableInfo,
ImmutableSet.of(Pair.of("lineitem", "l_shipdate"),
Pair.of("orders", "o_orderdate")), "");
ImmutableSet.of(Pair.of("lineitem", "l_shipdate")), "");
});
}

@Test
public void test502() {
PlanChecker.from(connectContext)
.checkExplain(" select l_shipdate, o_orderdate, count(l_shipdate) \n"
+ " from lineitem_auto\n"
+ " inner join orders_auto\n"
+ " on l_shipdate = o_orderdate\n"
+ " group by l_shipdate, o_orderdate",
nereidsPlanner -> {
Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan();
RelatedTableInfo relatedTableInfo =
MaterializedViewUtils.getRelatedTableInfos("l_shipdate", null,
rewrittenPlan, nereidsPlanner.getCascadesContext());
successWith(relatedTableInfo,
ImmutableSet.of(Pair.of("lineitem_auto", "l_shipdate"),
Pair.of("orders_auto", "o_orderdate")), "");
});
}

Expand Down Expand Up @@ -253,8 +314,26 @@ public void test6() {
MaterializedViewUtils.getRelatedTableInfos("o_orderdate", null,
rewrittenPlan, nereidsPlanner.getCascadesContext());
successWith(relatedTableInfo,
ImmutableSet.of(Pair.of("lineitem", "l_shipdate"),
Pair.of("orders", "o_orderdate")), "");
ImmutableSet.of(Pair.of("orders", "o_orderdate")), "");
});
}

@Test
public void test601() {
PlanChecker.from(connectContext)
.checkExplain(" select l_shipdate, o_orderdate, count(l_shipdate) \n"
+ " from lineitem_auto\n"
+ " inner join orders_auto\n"
+ " on l_shipdate = o_orderdate\n"
+ " group by l_shipdate, o_orderdate",
nereidsPlanner -> {
Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan();
RelatedTableInfo relatedTableInfo =
MaterializedViewUtils.getRelatedTableInfos("o_orderdate", null,
rewrittenPlan, nereidsPlanner.getCascadesContext());
successWith(relatedTableInfo,
ImmutableSet.of(Pair.of("lineitem_auto", "l_shipdate"),
Pair.of("orders_auto", "o_orderdate")), "");
});
}

Expand Down

0 comments on commit 198e290

Please sign in to comment.