Skip to content

Commit

Permalink
Merge branch 'master' into 20240702_fix_free_jemalloc_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jul 3, 2024
2 parents 4211565 + cb795bd commit 57e523e
Show file tree
Hide file tree
Showing 70 changed files with 1,818 additions and 2,032 deletions.
13 changes: 13 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_min_max.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ template <typename Data>
struct AggregateFunctionMaxData : public Data {
using Self = AggregateFunctionMaxData;
using Data::IsFixedLength;
constexpr static bool IS_ANY = false;

AggregateFunctionMaxData() { reset(); }

Expand Down Expand Up @@ -476,6 +477,7 @@ template <typename Data>
struct AggregateFunctionMinData : Data {
using Self = AggregateFunctionMinData;
using Data::IsFixedLength;
constexpr static bool IS_ANY = false;

AggregateFunctionMinData() { reset(); }

Expand All @@ -502,6 +504,7 @@ template <typename Data>
struct AggregateFunctionAnyData : Data {
using Self = AggregateFunctionAnyData;
using Data::IsFixedLength;
constexpr static bool IS_ANY = true;

void change_if_better(const IColumn& column, size_t row_num, Arena* arena) {
this->change_first_time(column, row_num, arena);
Expand Down Expand Up @@ -542,6 +545,16 @@ class AggregateFunctionsSingleValue final
this->data(place).change_if_better(*columns[0], row_num, arena);
}

void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns,
Arena* arena) const override {
if constexpr (Data::IS_ANY) {
DCHECK_GT(batch_size, 0);
this->data(place).change_if_better(*columns[0], 0, arena);
} else {
Base::add_batch_single_place(batch_size, place, columns, arena);
}
}

void reset(AggregateDataPtr place) const override { this->data(place).reset(); }

void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ private boolean processModifyColumn(ModifyColumnClause alterClause, OlapTable ol
}
if (!modColumn.isKey()) {
if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
modColumn.setAggregationType(AggregateType.NONE, false);
modColumn.setAggregationType(AggregateType.NONE, true);
} else {
modColumn.setAggregationType(AggregateType.REPLACE, true);
}
Expand Down Expand Up @@ -1387,6 +1387,14 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long,
if (!alterColumn.equals(originSchema.get(i))) {
needAlterColumns.add(alterColumn);
hasColumnChange = true;
} else {
Column oriColumn = originSchema.get(i);
if ((oriColumn.getGeneratedColumnInfo() != null
|| alterColumn.getGeneratedColumnInfo() != null)
&& !oriColumn.getGeneratedColumnInfo().getExprSql()
.equals(alterColumn.getGeneratedColumnInfo().getExprSql())) {
throw new DdlException("Not supporting alter table modify generated columns.");
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
Expand Down Expand Up @@ -319,6 +320,16 @@ public void addTruncateTable(TruncateTableInfo info, long commitSeq) {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

public void addTableRename(TableInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = Lists.newArrayList();
tableIds.add(info.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.RENAME_TABLE;
String data = info.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}

// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
Expand Down
8 changes: 5 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -4637,7 +4637,8 @@ public void renameTable(Database db, Table table, String newTableName) throws Dd
db.unregisterTable(oldTableName);
db.registerTable(table);

TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName);
TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), oldTableName,
newTableName);
editLog.logTableRename(tableInfo);
LOG.info("rename table[{}] to {}", oldTableName, newTableName);
} finally {
Expand Down Expand Up @@ -4824,7 +4825,8 @@ public void renameRollup(Database db, OlapTable table, RollupRenameClause rename
indexNameToIdMap.put(newRollupName, indexId);

// log
TableInfo tableInfo = TableInfo.createForRollupRename(db.getId(), table.getId(), indexId, newRollupName);
TableInfo tableInfo = TableInfo.createForRollupRename(db.getId(), table.getId(), indexId,
rollupName, newRollupName);
editLog.logRollupRename(tableInfo);
LOG.info("rename rollup[{}] to {}", rollupName, newRollupName);
} finally {
Expand Down Expand Up @@ -4883,7 +4885,7 @@ public void renamePartition(Database db, OlapTable table, PartitionRenameClause

// log
TableInfo tableInfo = TableInfo.createForPartitionRename(db.getId(), table.getId(), partition.getId(),
newPartitionName);
partitionName, newPartitionName);
editLog.logPartitionRename(tableInfo);
LOG.info("rename partition[{}] to {}", partitionName, newPartitionName);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
Expand Down Expand Up @@ -476,6 +477,19 @@ public Boolean visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> so
return true;
}

@Override
public Boolean visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Void context) {
// process must shuffle
visit(topN, context);

// If child is DistributionSpecGather, topN should forbid two-phase topN
if (topN.getSortPhase() == SortPhase.LOCAL_SORT
&& childrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE)) {
return false;
}
return true;
}

/**
* check both side real output hash key order are same or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,21 @@ public void pruneSlots(Set<Slot> outputSlots) {
fdDgBuilder.removeNotContain(outputSlots);
}

public void replace(Map<Slot, Slot> replaceMap) {
public void replaceUniformBy(Map<Slot, Slot> replaceMap) {
uniformSet.replace(replaceMap);
}

public void replaceUniqueBy(Map<Slot, Slot> replaceMap) {
uniqueSet.replace(replaceMap);
}

public void replaceEqualSetBy(Map<Slot, Slot> replaceMap) {
equalSetBuilder.replace(replaceMap);
}

public void replaceFuncDepsBy(Map<Slot, Slot> replaceMap) {
fdDgBuilder.replace(replaceMap);
}
}

static class NestedSet {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ public DGItem(DGItem dgItem) {
this.parents = new HashSet<>(dgItem.parents);
this.children = new HashSet<>(dgItem.children);
}

public void replace(Map<Slot, Slot> replaceMap) {
Set<Slot> newSlots = new HashSet<>();
for (Slot slot : slots) {
newSlots.add(replaceMap.getOrDefault(slot, slot));
}
this.slots = newSlots;
}
}

private final Map<Set<Slot>, Integer> itemMap; // Maps sets of slots to their indices in the dgItems list
Expand Down Expand Up @@ -211,6 +219,21 @@ public void addDeps(FuncDepsDG funcDepsDG) {
}
}

public void replace(Map<Slot, Slot> replaceSlotMap) {
for (DGItem item : dgItems) {
item.replace(replaceSlotMap);
}
Map<Set<Slot>, Integer> newItemMap = new HashMap<>();
for (Entry<Set<Slot>, Integer> e : itemMap.entrySet()) {
Set<Slot> key = new HashSet<>();
for (Slot slot : e.getKey()) {
key.add(replaceSlotMap.getOrDefault(slot, slot));
}
newItemMap.put(key, e.getValue());
}
this.itemMap = newItemMap;
}

private DGItem getOrCreateNode(Set<Slot> slots) {
if (!itemMap.containsKey(slots)) {
itemMap.put(slots, dgItems.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ private Statistics computeCatalogRelation(CatalogRelation catalogRelation) {
builder.putColumnStatistics(slot, colStatsBuilder.build());
}
checkIfUnknownStatsUsedAsKey(builder);
return builder.build();
return builder.setRowCount(rowCount).build();
}

private Statistics computeTopN(TopN topN) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public LogicalProperties computeLogicalProperties() {
} else {
Supplier<List<Slot>> outputSupplier = Suppliers.memoize(this::computeOutput);
Supplier<DataTrait> fdSupplier = () -> this instanceof LogicalPlan
? ((LogicalPlan) this).computeFuncDeps()
? ((LogicalPlan) this).computeDataTrait()
: DataTrait.EMPTY_TRAIT;
return new LogicalProperties(outputSupplier, fdSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public interface BlockFuncDepsPropagation extends LogicalPlan {
@Override
default DataTrait computeFuncDeps() {
default DataTrait computeDataTrait() {
return DataTrait.EMPTY_TRAIT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
public interface PropagateFuncDeps extends LogicalPlan {
@Override
default DataTrait computeFuncDeps() {
default DataTrait computeDataTrait() {
if (children().size() == 1) {
// Note when changing function dependencies, we always clone it.
// So it's safe to return a reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void validate(boolean isOlap, Set<String> keysSet, boolean isEnableMergeO
}

if (isOlap) {
if (!isKey && keysType.equals(KeysType.UNIQUE_KEYS)) {
if (!isKey && (keysType.equals(KeysType.UNIQUE_KEYS) || keysType.equals(KeysType.DUP_KEYS))) {
aggTypeImplicit = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,62 +132,42 @@ public ImmutableSet<FdItem> computeFdItems() {
return builder.build();
}

@Override
public void computeUnique(Builder builder) {
builder.addUniqueSlot(child(0).getLogicalProperties().getTrait());
if (qualifier == Qualifier.DISTINCT) {
builder.addUniqueSlot(ImmutableSet.copyOf(getOutput()));
}
Map<Slot, Slot> constructReplaceMapForChild(int index) {
Map<Slot, Slot> replaceMap = new HashMap<>();
List<Slot> output = getOutput();
List<? extends Slot> originalOutputs = regularChildrenOutputs.isEmpty()
? child(0).getOutput()
: regularChildrenOutputs.get(0);
? child(index).getOutput()
: regularChildrenOutputs.get(index);
for (int i = 0; i < output.size(); i++) {
replaceMap.put(originalOutputs.get(i), output.get(i));
}
builder.replace(replaceMap);
return replaceMap;
}

@Override
public void computeUnique(Builder builder) {
builder.addUniqueSlot(child(0).getLogicalProperties().getTrait());
if (qualifier == Qualifier.DISTINCT) {
builder.addUniqueSlot(ImmutableSet.copyOf(getOutput()));
}
builder.replaceUniqueBy(constructReplaceMapForChild(0));
}

@Override
public void computeEqualSet(DataTrait.Builder builder) {
builder.addEqualSet(child(0).getLogicalProperties().getTrait());
Map<Slot, Slot> replaceMap = new HashMap<>();
List<Slot> output = getOutput();
List<? extends Slot> originalOutputs = regularChildrenOutputs.isEmpty()
? child(0).getOutput()
: regularChildrenOutputs.get(0);
for (int i = 0; i < output.size(); i++) {
replaceMap.put(originalOutputs.get(i), output.get(i));
}
builder.replace(replaceMap);
builder.replaceEqualSetBy(constructReplaceMapForChild(0));
}

@Override
public void computeFd(DataTrait.Builder builder) {
builder.addFuncDepsDG(child(0).getLogicalProperties().getTrait());
Map<Slot, Slot> replaceMap = new HashMap<>();
List<Slot> output = getOutput();
List<? extends Slot> originalOutputs = regularChildrenOutputs.isEmpty()
? child(0).getOutput()
: regularChildrenOutputs.get(0);
for (int i = 0; i < output.size(); i++) {
replaceMap.put(originalOutputs.get(i), output.get(i));
}
builder.replace(replaceMap);
builder.replaceFuncDepsBy(constructReplaceMapForChild(0));
}

@Override
public void computeUniform(Builder builder) {
builder.addUniformSlot(child(0).getLogicalProperties().getTrait());
Map<Slot, Slot> replaceMap = new HashMap<>();
List<Slot> output = getOutput();
List<? extends Slot> originalOutputs = regularChildrenOutputs.isEmpty()
? child(0).getOutput()
: regularChildrenOutputs.get(0);
for (int i = 0; i < output.size(); i++) {
replaceMap.put(originalOutputs.get(i), output.get(i));
}
builder.replace(replaceMap);
builder.replaceUniformBy(constructReplaceMapForChild(0));
}
}
Loading

0 comments on commit 57e523e

Please sign in to comment.