Skip to content

Commit

Permalink
[Feature](partitions) Support auto partition FE part
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Sep 8, 2023
1 parent c68e6a9 commit 837cfd6
Show file tree
Hide file tree
Showing 18 changed files with 787 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ public final class FeMetaVersion {
public static final int VERSION_123 = 123;
// For auto-increment column
public static final int VERSION_124 = 124;
// For write/read auto create partition expr
public static final int VERSION_125 = 125;

// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_124;
public static final int VERSION_CURRENT = VERSION_125;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
23 changes: 22 additions & 1 deletion fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -3228,7 +3228,28 @@ opt_partition ::=
| KW_PARTITION KW_BY KW_LIST LPAREN ident_list:columns RPAREN
LPAREN opt_all_partition_desc_list:list RPAREN
{:
RESULT = new ListPartitionDesc(columns, list);
RESULT = new ListPartitionDesc(columns, list);
:}
/* expr range partition */
| KW_AUTO KW_PARTITION KW_BY KW_RANGE function_call_expr:fnExpr
LPAREN opt_all_partition_desc_list:list RPAREN
{:
ArrayList<Expr> exprs = new ArrayList<Expr>();
exprs.add(fnExpr);
RESULT = RangePartitionDesc.createRangePartitionDesc(exprs, list);
:}
/* expr list partition */
| KW_AUTO KW_PARTITION KW_BY KW_LIST LPAREN expr_list:exprs RPAREN
LPAREN opt_all_partition_desc_list:list RPAREN
{:
RESULT = ListPartitionDesc.createListPartitionDesc(exprs, list);
:}
| KW_AUTO KW_PARTITION KW_BY KW_LIST function_call_expr:fnExpr
LPAREN opt_all_partition_desc_list:list RPAREN
{:
ArrayList<Expr> exprs = new ArrayList<Expr>();
exprs.add(fnExpr);
RESULT = ListPartitionDesc.createListPartitionDesc(exprs, list);
:}
;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ public ListPartitionDesc(List<String> partitionColNames,
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
super(partitionColNames, allPartitionDescs);
type = PartitionType.LIST;
this.isAutoCreatePartitions = false;
}

public ListPartitionDesc(ArrayList<Expr> exprs, List<String> partitionColNames,
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
if (exprs != null) {
this.partitionExprs = exprs;
}
this.partitionColNames = partitionColNames;
this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
this.type = PartitionType.LIST;
this.isAutoCreatePartitions = true;
}

public static ListPartitionDesc createListPartitionDesc(ArrayList<Expr> exprs,
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
List<String> colNames = getColNamesFromExpr(exprs, true);
return new ListPartitionDesc(exprs, colNames, allPartitionDescs);
}

@Override
Expand Down Expand Up @@ -100,7 +118,8 @@ public PartitionInfo toPartitionInfo(List<Column> schema, Map<String, Long> part
}
}

ListPartitionInfo listPartitionInfo = new ListPartitionInfo(partitionColumns);
ListPartitionInfo listPartitionInfo = new ListPartitionInfo(this.isAutoCreatePartitions, this.partitionExprs,
partitionColumns);
for (SinglePartitionDesc desc : singlePartitionDescs) {
long partitionId = partitionNameToId.get(desc.getPartitionName());
listPartitionInfo.handleNewSinglePartitionDesc(desc, partitionId, isTemp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,38 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.NotImplementedException;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class PartitionDesc {
protected List<String> partitionColNames;
protected List<SinglePartitionDesc> singlePartitionDescs;

protected ArrayList<Expr> partitionExprs; //eg: auto partition by range date_trunc(column, 'day')
protected boolean isAutoCreatePartitions;
protected PartitionType type;
public static final ImmutableSet<String> RANGE_PARTITION_FUNCTIONS = new ImmutableSortedSet.Builder<String>(
String.CASE_INSENSITIVE_ORDER).add("date_trunc").add("date_ceil").add("date_floor")
.build();

public PartitionDesc() {}

public PartitionDesc(List<String> partitionColNames,
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
this.partitionColNames = partitionColNames;
this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
}

public List<SinglePartitionDesc> handleAllPartitionDesc(List<AllPartitionDesc> allPartitionDescs)
throws AnalysisException {
boolean isMultiPartition = false;
List<SinglePartitionDesc> tmpList = Lists.newArrayList();
if (allPartitionDescs != null) {
Expand All @@ -65,7 +77,7 @@ public PartitionDesc(List<String> partitionColNames,
throw new AnalysisException("multi partition column size except 1 but provided "
+ partitionColNames.size() + ".");
}
this.singlePartitionDescs = tmpList;
return tmpList;
}

public List<SinglePartitionDesc> getSinglePartitionDescs() {
Expand All @@ -85,6 +97,62 @@ public List<String> getPartitionColNames() {
return partitionColNames;
}

// 1. partition by list (column) : now support one slotRef
// 2. partition by range(column/function(column)) : support slotRef and some
// special function eg: date_trunc, date_floor/ceil
public static List<String> getColNamesFromExpr(ArrayList<Expr> exprs, boolean isListPartition)
throws AnalysisException {
List<String> colNames = new ArrayList<>();
for (Expr expr : exprs) {
if ((expr instanceof FunctionCallExpr) && (isListPartition == false)) {
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
List<Expr> paramsExpr = functionCallExpr.getParams().exprs();
String name = functionCallExpr.getFnName().getFunction();
if (RANGE_PARTITION_FUNCTIONS.contains(name)) {
for (Expr param : paramsExpr) {
if (param instanceof SlotRef) {
if (colNames.isEmpty()) {
colNames.add(((SlotRef) param).getColumnName());
} else {
throw new AnalysisException(
"auto create partition only support one slotRef in function expr. "
+ expr.toSql());
}
}
}
} else {
throw new AnalysisException(
"auto create partition only support function call expr is date_trunc/date_floor/date_ceil. "
+ expr.toSql());
}
} else if (expr instanceof SlotRef) {
if (colNames.isEmpty()) {
colNames.add(((SlotRef) expr).getColumnName());
} else {
throw new AnalysisException(
"auto create partition only support one slotRef in expr. "
+ expr.toSql());
}
} else {
if (!isListPartition) {
throw new AnalysisException(
"auto create partition only support slotRef and date_trunc/date_floor/date_ceil"
+ "function in range partitions. " + expr.toSql());
} else {
throw new AnalysisException(
"auto create partition only support slotRef in list partitions. "
+ expr.toSql());
}
}
}
if (colNames.isEmpty()) {
throw new AnalysisException(
"auto create partition have not find any partition columns. "
+ exprs.get(0).toSql());
}
return colNames;
}

public void analyze(List<ColumnDef> columnDefs, Map<String, String> otherProperties) throws AnalysisException {
if (partitionColNames == null || partitionColNames.isEmpty()) {
throw new AnalysisException("No partition columns.");
Expand Down Expand Up @@ -128,6 +196,15 @@ public void analyze(List<ColumnDef> columnDefs, Map<String, String> otherPropert
if (this instanceof ListPartitionDesc && columnDef.isAllowNull()) {
throw new AnalysisException("The list partition column must be NOT NULL");
}
if (this instanceof RangePartitionDesc && partitionExprs != null) {
if (partitionExprs.get(0) instanceof FunctionCallExpr) {
if (!columnDef.getType().isDatetime() && !columnDef.getType().isDatetimeV2()) {
throw new AnalysisException(
"auto create partition function expr need datetime/datetimev2 type. "
+ partitionExprs.get(0).toSql());
}
}
}
found = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TStringLiteral;

import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class PartitionExprUtil {
public static final String DATETIME_FORMATTER = "%04d-%02d-%02d %02d:%02d:%02d";
public static final String DATE_FORMATTER = "%04d-%02d-%02d";
public static final String DATETIME_NAME_FORMATTER = "%04d%02d%02d%02d%02d%02d";
private static final Logger LOG = LogManager.getLogger(PartitionExprUtil.class);
private static final PartitionExprUtil partitionExprUtil = new PartitionExprUtil();

public static FunctionIntervalInfo getFunctionIntervalInfo(ArrayList<Expr> partitionExprs,
PartitionType partitionType) throws AnalysisException {
if (partitionType != PartitionType.RANGE) {
return null;
}
if (partitionExprs.size() != 1) {
throw new AnalysisException("now only support one expr in range partition");
}

Expr e = partitionExprs.get(0);
if (!(e instanceof FunctionCallExpr)) {
throw new AnalysisException("now range partition only support FunctionCallExpr");
}
FunctionCallExpr functionCallExpr = (FunctionCallExpr) e;
String fnName = functionCallExpr.getFnName().getFunction();
String timeUnit;
int interval;
if ("date_trunc".equalsIgnoreCase(fnName)) {
List<Expr> paramsExprs = functionCallExpr.getParams().exprs();
if (paramsExprs.size() != 2) {
throw new AnalysisException("date_trunc params exprs size should be 2.");
}
Expr param = paramsExprs.get(1);
if (!(param instanceof StringLiteral)) {
throw new AnalysisException("date_trunc param of time unit is not string literal.");
}
timeUnit = ((StringLiteral) param).getStringValue().toLowerCase();
interval = 1;
} else {
throw new AnalysisException("now range partition only support date_trunc.");
}
return partitionExprUtil.new FunctionIntervalInfo(timeUnit, interval);
}

public static DateLiteral getRangeEnd(DateLiteral beginTime, FunctionIntervalInfo intervalInfo)
throws AnalysisException {
String timeUnit = intervalInfo.timeUnit;
int interval = intervalInfo.interval;
switch (timeUnit) {
case "year":
return beginTime.plusYears(interval);
case "month":
return beginTime.plusMonths(interval);
case "day":
return beginTime.plusDays(interval);
case "hour":
return beginTime.plusHours(interval);
case "minute":
return beginTime.plusMinutes(interval);
case "second":
return beginTime.plusSeconds(interval);
default:
break;
}
return null;
}

public static Map<String, AddPartitionClause> getAddPartitionClauseFromPartitionValues(OlapTable olapTable,
ArrayList<TStringLiteral> partitionValues, PartitionInfo partitionInfo)
throws AnalysisException {
Map<String, AddPartitionClause> result = Maps.newHashMap();
ArrayList<Expr> partitionExprs = partitionInfo.getPartitionExprs();
PartitionType partitionType = partitionInfo.getType();
List<Column> partiitonColumn = partitionInfo.getPartitionColumns();
Type partitionColumnType = partiitonColumn.get(0).getType();
FunctionIntervalInfo intervalInfo = getFunctionIntervalInfo(partitionExprs, partitionType);
Set<String> filterPartitionValues = new HashSet<String>();

for (TStringLiteral partitionValue : partitionValues) {
PartitionKeyDesc partitionKeyDesc = null;
String partitionName = "p";
String value = partitionValue.value;
if (filterPartitionValues.contains(value)) {
continue;
}
filterPartitionValues.add(value);
if (partitionType == PartitionType.RANGE) {
String beginTime = value;
DateLiteral beginDateTime = new DateLiteral(beginTime, Type.DATETIMEV2);
partitionName += String.format(DATETIME_NAME_FORMATTER,
beginDateTime.getYear(), beginDateTime.getMonth(), beginDateTime.getDay(),
beginDateTime.getHour(), beginDateTime.getMinute(), beginDateTime.getSecond());
DateLiteral endDateTime = getRangeEnd(beginDateTime, intervalInfo);
partitionKeyDesc = createPartitionKeyDescWithRange(beginDateTime, endDateTime, partitionColumnType);
} else if (partitionType == PartitionType.LIST) {
List<List<PartitionValue>> listValues = new ArrayList<>();
// TODO: need to support any type
String pointValue = value;
PartitionValue lowerValue = new PartitionValue(pointValue);
listValues.add(Collections.singletonList(lowerValue));
partitionKeyDesc = PartitionKeyDesc.createIn(
listValues);
partitionName += lowerValue.getStringValue();
} else {
throw new AnalysisException("now only support range and list partition");
}

Map<String, String> partitionProperties = Maps.newHashMap();
DistributionDesc distributionDesc = olapTable.getDefaultDistributionInfo().toDistributionDesc();

SinglePartitionDesc singleRangePartitionDesc = new SinglePartitionDesc(true, partitionName,
partitionKeyDesc, partitionProperties);

AddPartitionClause addPartitionClause = new AddPartitionClause(singleRangePartitionDesc,
distributionDesc, partitionProperties, false);
result.put(partitionName, addPartitionClause);
}
return result;
}

public static PartitionKeyDesc createPartitionKeyDescWithRange(DateLiteral beginDateTime,
DateLiteral endDateTime, Type partitionColumnType) throws AnalysisException {
String beginTime;
String endTime;
// maybe need check the range in FE also, like getAddPartitionClause.
if (partitionColumnType.isDate() || partitionColumnType.isDateV2()) {
beginTime = String.format(DATE_FORMATTER, beginDateTime.getYear(), beginDateTime.getMonth(),
beginDateTime.getDay());
endTime = String.format(DATE_FORMATTER, endDateTime.getYear(), endDateTime.getMonth(),
endDateTime.getDay());
} else if (partitionColumnType.isDatetime() || partitionColumnType.isDatetimeV2()) {
beginTime = String.format(DATETIME_FORMATTER,
beginDateTime.getYear(), beginDateTime.getMonth(), beginDateTime.getDay(),
beginDateTime.getHour(), beginDateTime.getMinute(), beginDateTime.getSecond());
endTime = String.format(DATETIME_FORMATTER,
endDateTime.getYear(), endDateTime.getMonth(), endDateTime.getDay(),
endDateTime.getHour(), endDateTime.getMinute(), endDateTime.getSecond());
} else {
throw new AnalysisException(
"not support range partition with column type : " + partitionColumnType.toString());
}
PartitionValue lowerValue = new PartitionValue(beginTime);
PartitionValue upperValue = new PartitionValue(endTime);
return PartitionKeyDesc.createFixed(
Collections.singletonList(lowerValue),
Collections.singletonList(upperValue));
}

public class FunctionIntervalInfo {
public String timeUnit;
public int interval;

public FunctionIntervalInfo(String timeUnit, int interval) {
this.timeUnit = timeUnit;
this.interval = interval;
}
}
}
Loading

0 comments on commit 837cfd6

Please sign in to comment.