Skip to content

Commit

Permalink
fix(sql): fix filter in sql with tagKV (#478)
Browse files Browse the repository at this point in the history
fix filter in sql with tagKV to enable the support of TSBS
  • Loading branch information
jzl18thu authored Nov 9, 2024
1 parent f923ab8 commit 826e85b
Show file tree
Hide file tree
Showing 14 changed files with 1,114 additions and 867 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,14 @@ protected Operator generateRoot(Statement statement) {
root = new Rename(new OperatorSource(root), cte.getAliasList());
cte.setRoot(root);
});
return generateRoot(selectStatement);

// 计算语句的操作符树
Operator root = generateRoot(selectStatement);

// 去除最终结果的空列
root = new RemoveNullColumn(new OperatorSource(root));

return root;
}

private Operator generateRoot(SelectStatement selectStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ public RowStream executeUnaryOperator(
return executeGroupBy((GroupBy) operator, table);
case AddSequence:
return executeAddSequence((AddSequence) operator, table);
case RemoveNullColumn:
return executeRemoveNullColumn(table);
case Distinct:
return executeDistinct((Distinct) operator, table);
case ValueToSelectedPath:
Expand Down Expand Up @@ -568,6 +570,44 @@ private RowStream executeDistinct(Distinct distinct, Table table) throws Physica
return new Table(newHeader, targetRows);
}

private RowStream executeRemoveNullColumn(Table table) {
Header header = table.getHeader();
int fieldSize = header.getFieldSize();
List<Row> rows = table.getRows();

List<Integer> remainIndexes = new ArrayList<>();
for (int i = 0; i < fieldSize; i++) {
int finalI = i;
boolean isEmptyColumn = rows.stream().allMatch(row -> row.getValue(finalI) == null);
if (!isEmptyColumn) {
remainIndexes.add(finalI);
}
}

int remainColumnSize = remainIndexes.size();
if (remainColumnSize == fieldSize) { // 没有空列
return table;
} else if (remainIndexes.isEmpty()) { // 全是空列
return rows.isEmpty() ? table : Table.EMPTY_TABLE;
}

List<Field> newFields = new ArrayList<>(remainColumnSize);
for (int index : remainIndexes) {
newFields.add(header.getField(index));
}
Header newHeader = new Header(header.getKey(), newFields);

List<Row> newRows = new ArrayList<>();
for (Row row : rows) {
Object[] values = new Object[remainColumnSize];
for (int i = 0; i < remainColumnSize; i++) {
values[i] = row.getValue(remainIndexes.get(i));
}
newRows.add(new Row(newHeader, row.getKey(), values));
}
return new Table(newHeader, newRows);
}

private RowStream executeValueToSelectedPath(ValueToSelectedPath operator, Table table) {
String prefix = operator.getPrefix();
boolean prefixIsEmpty = prefix.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static boolean validate(Filter filter, Row row) throws PhysicalException
if (row.getKey() == Row.NON_EXISTED_KEY) {
return false;
}
return validateTimeFilter(keyFilter, row);
return validateKeyFilter(keyFilter, row);
case Value:
ValueFilter valueFilter = (ValueFilter) filter;
return validateValueFilter(valueFilter, row);
Expand Down Expand Up @@ -125,7 +125,7 @@ private static boolean validateInFilter(InFilter inFilter, Row row) {
}
}

private static boolean validateTimeFilter(KeyFilter keyFilter, Row row) {
private static boolean validateKeyFilter(KeyFilter keyFilter, Row row) {
long timestamp = row.getKey();
switch (keyFilter.getOp()) {
case E:
Expand Down Expand Up @@ -161,8 +161,8 @@ private static boolean validateValueFilter(ValueFilter valueFilter, Row row)
return false;
}

if (path.contains("*")) { // 带通配符的filter
List<Value> valueList = row.getAsValueByPattern(path);
List<Value> valueList = row.getAsValueByPattern(path);
if (valueList.size() > 1) { // filter的路径名带通配符或同一列名有多个tag
if (Op.isOrOp(valueFilter.getOp())) {
for (Value value : valueList) {
if (value == null || value.isNull()) {
Expand All @@ -186,12 +186,14 @@ private static boolean validateValueFilter(ValueFilter valueFilter, Row row)
} else {
throw new IllegalArgumentException("Unknown op type: " + valueFilter.getOp());
}
} else {
Value value = row.getAsValue(path);
} else if (valueList.size() == 1) {
Value value = valueList.get(0);
if (value == null || value.isNull()) { // value是空值,则认为不可比较
return false;
}
return validateValueCompare(valueFilter.getOp(), value, targetValue);
} else {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public List<Integer> patternIndexOf(String pattern) {
List<Integer> indexList = new ArrayList<>();
fields.forEach(
field -> {
if (Pattern.matches(StringUtils.reformatPath(pattern), field.getFullName())) {
if (Pattern.matches(StringUtils.reformatPath(pattern), field.getName())) {
indexList.add(indexOf(field.getFullName()));
}
});
Expand All @@ -119,7 +119,7 @@ public List<Integer> patternIndexOf(String pattern) {

@Override
public String toString() {
return "Header{" + "time=" + key + ", fields=" + fields + '}';
return "Header{" + "key=" + key + ", fields=" + fields + '}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* IGinX - the polystore system with high performance
* Copyright (C) Tsinghua University
* [email protected]
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package cn.edu.tsinghua.iginx.engine.shared.operator;

import cn.edu.tsinghua.iginx.engine.shared.operator.type.OperatorType;
import cn.edu.tsinghua.iginx.engine.shared.source.Source;

public class RemoveNullColumn extends AbstractUnaryOperator {

public RemoveNullColumn(Source source) {
super(OperatorType.RemoveNullColumn, source);
}

@Override
public Operator copy() {
return new RemoveNullColumn(getSource().copy());
}

@Override
public UnaryOperator copyWithSource(Source source) {
return new RemoveNullColumn(source);
}

@Override
public String getInfo() {
return "RemoveNullColumn";
}

@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
return object != null && getClass() == object.getClass();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public enum OperatorType {
GroupBy,
Distinct,
AddSequence,
RemoveNullColumn,
ProjectWaitingForPath,
ValueToSelectedPath;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iginx.engine.logical.utils.LogicalFilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.exception.PhysicalException;
import cn.edu.tsinghua.iginx.engine.physical.exception.StorageInitializationException;
import cn.edu.tsinghua.iginx.engine.physical.memory.execute.utils.FilterUtils;
import cn.edu.tsinghua.iginx.engine.physical.storage.IStorage;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.Column;
import cn.edu.tsinghua.iginx.engine.physical.storage.domain.DataArea;
Expand Down Expand Up @@ -73,6 +74,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -623,7 +625,12 @@ private String generateQueryStatement(
statement += filterStr;
}

if (filter != null) {
// TODO:filter中的path有多个tag时暂未实现下推
List<String> filterPaths = FilterUtils.getAllPathsFromFilter(filter);
List<Column> columns = getColumns(new HashSet<>(filterPaths), tagFilter);
boolean hasMultiTags = hasMultiTags(columns);

if (filter != null && !hasMultiTags) {
boolean patternHasMeasurementWildCards = false;
for (String path : paths) {
if (path.startsWith("*")) {
Expand Down Expand Up @@ -687,6 +694,19 @@ private String generateQueryStatement(
return statement;
}

private boolean hasMultiTags(List<Column> columns) {
Map<String, List<Map<String, String>>> tagsMap = new HashMap<>();
for (Column column : columns) {
String path = column.getPath();
if (!tagsMap.containsKey(path)) {
tagsMap.put(path, new ArrayList<>(Collections.singletonList(column.getTags())));
} else if (!tagsMap.get(path).contains(column.getTags())) {
return true;
}
}
return false;
}

private Filter setTrueByMeasurement(Filter filter, String measurementName) {
switch (filter.getType()) {
case And:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -911,20 +911,14 @@ private boolean match(String s, String p) {
}

private String getFilterString(Filter filter, String storageUnit) throws PhysicalException {
String filterStr = FilterTransformer.toString(filter);
if (filterStr.contains("*")) {
List<Column> columns = new ArrayList<>();
Map<String, String> columns2Fragment = new HashMap<>();
getColumns2StorageUnit(columns, columns2Fragment, new HashSet<>(), null);
filterStr =
FilterTransformer.toString(
expandFilterWildcard(filter.copy(), columns, columns2Fragment, storageUnit));
}

return filterStr;
List<Column> columns = new ArrayList<>();
Map<String, String> columns2Fragment = new HashMap<>();
getColumns2StorageUnit(columns, columns2Fragment, new HashSet<>(), null);
Filter fullFilter = expandFilter(filter.copy(), columns, columns2Fragment, storageUnit);
return FilterTransformer.toString(fullFilter);
}

private Filter expandFilterWildcard(
private Filter expandFilter(
Filter filter,
List<Column> columns,
Map<String, String> columns2Fragment,
Expand All @@ -935,9 +929,9 @@ private Filter expandFilterWildcard(
List<Filter> children = andFilter.getChildren();
List<Filter> newAndFilters = new ArrayList<>();
for (Filter f : children) {
Filter newFilter = expandFilterWildcard(f, columns, columns2Fragment, storageUnit);
Filter newFilter = expandFilter(f, columns, columns2Fragment, storageUnit);
if (newFilter != null) {
newAndFilters.add(expandFilterWildcard(f, columns, columns2Fragment, storageUnit));
newAndFilters.add(expandFilter(f, columns, columns2Fragment, storageUnit));
}
}
return new AndFilter(newAndFilters);
Expand All @@ -946,17 +940,16 @@ private Filter expandFilterWildcard(
List<Filter> orChildren = orFilter.getChildren();
List<Filter> newOrFilters = new ArrayList<>();
for (Filter f : orChildren) {
Filter newFilter = expandFilterWildcard(f, columns, columns2Fragment, storageUnit);
Filter newFilter = expandFilter(f, columns, columns2Fragment, storageUnit);
if (newFilter != null) {
newOrFilters.add(expandFilterWildcard(f, columns, columns2Fragment, storageUnit));
newOrFilters.add(expandFilter(f, columns, columns2Fragment, storageUnit));
}
}
return new OrFilter(newOrFilters);
case Not:
NotFilter notFilter = (NotFilter) filter;
Filter notChild = notFilter.getChild();
Filter newNotFilter =
expandFilterWildcard(notChild, columns, columns2Fragment, storageUnit);
Filter newNotFilter = expandFilter(notChild, columns, columns2Fragment, storageUnit);
if (newNotFilter != null) return new NotFilter(newNotFilter);
else return null;
case Key:
Expand All @@ -965,25 +958,22 @@ private Filter expandFilterWildcard(
ValueFilter valueFilter = (ValueFilter) filter;
DataType valueType = valueFilter.getValue().getDataType();
String path = valueFilter.getPath();

if (path.contains("*")) {
List<String> matchedPath =
getMatchPath(path, valueType, columns, columns2Fragment, storageUnit);
if (matchedPath.size() == 0) {
return null;
}

List<String> matchedPaths =
getMatchPath(path, valueType, columns, columns2Fragment, storageUnit);
if (matchedPaths.size() == 1) {
return new ValueFilter(matchedPaths.get(0), valueFilter.getOp(), valueFilter.getValue());
} else if (matchedPaths.isEmpty()) {
return null;
} else {
List<Filter> newFilters = new ArrayList<>();
for (String p : matchedPath) {
for (String p : matchedPaths) {
newFilters.add(new ValueFilter(p, valueFilter.getOp(), valueFilter.getValue()));
}
if (Op.isOrOp(valueFilter.getOp())) {
return new OrFilter(newFilters);
} else {
return new AndFilter(newFilters);
}
} else {
return filter;
}
case In:
InFilter inFilter = (InFilter) filter;
Expand Down Expand Up @@ -1043,7 +1033,7 @@ private List<String> getMatchPath(

Matcher matcher = pattern.matcher(columnName);
if (matcher.find()) {
matchedPath.add(columnName);
matchedPath.add(TagKVUtils.toFullName(columnName, col.getTags()));
}
}

Expand Down
Loading

0 comments on commit 826e85b

Please sign in to comment.