Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde committed Sep 29, 2024
1 parent d06b1c4 commit c79412a
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static RelatedTableInfo getRelatedTableInfo(String column, String timeUni
return RelatedTableInfo.successWith(
partitionColumn.getTable().map(BaseTableInfo::new).get(),
true,
partitionColumn.getColumn().get().getName(),
extractColumn(partitionColumn).getName(),
entry.getValue().key().orElse(null),
entry.getValue().value());
}
Expand Down Expand Up @@ -209,7 +209,7 @@ public static Set<RelatedTableInfo> getRelatedTableInfos(String column, String t
relatedTableInfos.add(RelatedTableInfo.successWith(
partitionColumn.getTable().map(BaseTableInfo::new).get(),
true,
partitionColumn.getColumn().get().getName(),
extractColumn(partitionColumn).getName(),
entry.getValue().key().orElse(null),
entry.getValue().value()));
}
Expand Down Expand Up @@ -560,15 +560,6 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte
+ "Or the partition column is in the invalid table");
return null;
}
// Check the table which mv partition column belonged to is same as the current check relation or not
if (partitionColumnsToCheck.stream().noneMatch(columnSlot ->
columnSlot.getTable().map(TableIf::getFullQualifiers).orElse(ImmutableList.of())
.equals(((LogicalCatalogRelation) relation).getTable().getFullQualifiers()))) {
context.addFailReason(String.format("mv partition column name is not belonged to current check , "
+ "table, current table is %s",
((LogicalCatalogRelation) relation).getTable().getFullQualifiers()));
return null;
}
LogicalCatalogRelation logicalCatalogRelation = (LogicalCatalogRelation) relation;
TableIf table = logicalCatalogRelation.getTable();
if (!(table instanceof MTMVRelatedTableIf)) {
Expand All @@ -585,14 +576,12 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte
}
Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns());
for (SlotReference partitionColumn : partitionColumnsToCheck) {
Column mvReferenceColumn = partitionColumn.getColumn().get();
Expr definExpr = mvReferenceColumn.getDefineExpr();
if (definExpr instanceof SlotRef) {
Column referenceRollupColumn = ((SlotRef) definExpr).getColumn();
if (referenceRollupColumn != null) {
mvReferenceColumn = referenceRollupColumn;
}
if (!partitionColumn.getTable().map(TableIf::getFullQualifiers).orElse(ImmutableList.of())
.equals(((LogicalCatalogRelation) relation).getTable().getFullQualifiers())) {
// mv partition column name is not belonged to current table, continue check
continue;
}
Column mvReferenceColumn = extractColumn(partitionColumn);
if (partitionColumnSet.contains(mvReferenceColumn)
&& (!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull())) {
SlotReference currentPartitionSlot = null;
Expand All @@ -603,12 +592,12 @@ public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerConte
currentPartitionSlot = (SlotReference) catalogSlot;
}
}
// If self join or partition is in invalid side,
// If self join such as inner join or partition is in invalid side such as null generate side,
// should also check the partition column is in the shuttled equal set
boolean tableChecked = context.getPartitionAndRollupExpressionChecked().keySet().stream()
.anyMatch(slot -> Objects.equals(slot.getTable().map(BaseTableInfo::new).orElse(null),
new BaseTableInfo(table)));
if (tableChecked || context.getInvalidTableSet().contains(new BaseTableInfo(table))) {
if (tableChecked || context.getInvalidCatalogRelation().contains(relation)) {
boolean checkSuccess = false;
for (Set<Slot> equalSlotSet : context.getShttuledEqualSlotSet()) {
checkSuccess = equalSlotSet.contains(partitionColumn)
Expand Down Expand Up @@ -827,6 +816,22 @@ private static boolean checkPartition(Collection<? extends Expression> expressio
}
}

private static Column extractColumn(SlotReference slotReference) {
Optional<Column> slotReferenceColumn = slotReference.getColumn();
if (!slotReferenceColumn.isPresent()) {
return null;
}
Expr definExpr = slotReferenceColumn.get().getDefineExpr();
if (definExpr instanceof SlotRef) {
// If slotReference is from sync mv when rbo, should get actual column
Column referenceRollupColumn = ((SlotRef) definExpr).getColumn();
if (referenceRollupColumn != null) {
return referenceRollupColumn;
}
}
return slotReferenceColumn.get();
}

private static final class IncrementCheckerContext {
// This is used to record partition slot, and the map value is rollup expression and bool value which
// identify it's original partition or not
Expand All @@ -838,7 +843,7 @@ private static final class IncrementCheckerContext {
private final CascadesContext cascadesContext;
// This record the invalid table, such as the right side of left join, the partition column
// is invalid if is form the table
private final Set<BaseTableInfo> invalidTableSet = new HashSet<>();
private final Set<LogicalCatalogRelation> invalidCatalogRelation = new HashSet<>();
// This is used to check multi join input partition slot is in the join equal slot set or not
// if not, can not multi join input trigger partition update
private final Set<Set<Slot>> shttuledEqualSlotSet = new HashSet<>();
Expand All @@ -857,8 +862,8 @@ public void addFailReason(String failReason) {
this.failReasons.add(failReason);
}

public Set<BaseTableInfo> getInvalidTableSet() {
return invalidTableSet;
public Set<LogicalCatalogRelation> getInvalidCatalogRelation() {
return invalidCatalogRelation;
}

public CascadesContext getCascadesContext() {
Expand Down Expand Up @@ -926,15 +931,16 @@ public void addEqualSlotSet(Set<Slot> equalSet, List<Plan> planContext) {
}

public void collectInvalidTableSet(Plan plan) {
plan.accept(new DefaultPlanVisitor<Void, Set<BaseTableInfo>>() {
plan.accept(new DefaultPlanVisitor<Void, Set<LogicalCatalogRelation>>() {
@Override
public Void visitLogicalRelation(LogicalRelation relation, Set<BaseTableInfo> invalidTableSet) {
public Void visitLogicalRelation(LogicalRelation relation,
Set<LogicalCatalogRelation> invalidTableSet) {
if (relation instanceof LogicalCatalogRelation) {
invalidTableSet.add(new BaseTableInfo(((LogicalCatalogRelation) relation).getTable()));
invalidTableSet.add((LogicalCatalogRelation) relation);
}
return null;
}
}, this.invalidTableSet);
}, this.invalidCatalogRelation);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,24 @@ public void test500() {
});
}

@Test
public void test501() {
PlanChecker.from(connectContext)
.checkExplain(" select l_shipdate_alias, o.o_orderdate_alias, count(l_shipdate_alias) \n"
+ " from (select date_trunc(l_shipdate, 'day') l_shipdate_alias from lineitem) l\n"
+ " inner join (select date_trunc(o_orderdate, 'day') o_orderdate_alias from orders) o\n"
+ " on l_shipdate_alias = o.o_orderdate_alias\n"
+ " group by l_shipdate_alias, o.o_orderdate_alias",
nereidsPlanner -> {
Plan rewrittenPlan = nereidsPlanner.getRewrittenPlan();
Set<RelatedTableInfo> relatedTableInfos =
MaterializedViewUtils.getRelatedTableInfos("l_shipdate_alias", "month",
rewrittenPlan, nereidsPlanner.getCascadesContext());
successWith(relatedTableInfos,
ImmutableSet.of(Pair.of("lineitem", "l_shipdate")), "month");
});
}

// inner join + not self join + partition in join condition + invalid side
@Test
public void test6() {
Expand Down Expand Up @@ -755,7 +773,8 @@ private static void successWith(Set<RelatedTableInfo> relatedTableInfos,
Assertions.assertTrue(partitionExpression.isPresent());
List<DateTrunc> dateTruncs = partitionExpression.get().collectToList(DateTrunc.class::isInstance);
Assertions.assertEquals(1, dateTruncs.size());
Assertions.assertEquals(dateTruncs.get(0).getArgument(1).toString().toLowerCase(), timeUnit);
Assertions.assertEquals(dateTruncs.get(0).getArgument(1).toString().toLowerCase(),
"'" + timeUnit + "'");
}
if (StringUtils.isEmpty(timeUnit)) {
Assertions.assertFalse(partitionExpression.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ suite("self_conn_range_date_increment_create") {

// change left table data
// create mv base on left table with partition col
sql_error_list = [mv_sql_1, mv_sql_3, mv_sql_7, mv_sql_9, mv_sql_10, mv_sql_12,
mv_sql_13, mv_sql_15, mv_sql_16, mv_sql_18]
sql_error_list = [mv_sql_4, mv_sql_6, mv_sql_7, mv_sql_8, mv_sql_9, mv_sql_10, mv_sql_11, mv_sql_12,
mv_sql_13, mv_sql_14, mv_sql_15, mv_sql_16, mv_sql_17, mv_sql_18]
list_judgement(sql_all_list, sql_increment_list, sql_complete_list, sql_error_list,
partition_by_part_col, primary_tb_change, is_complete_change)

Expand Down

0 comments on commit c79412a

Please sign in to comment.