From c7939f88b8e17a00e1fa85abff0f903479dbad0b Mon Sep 17 00:00:00 2001 From: jzl18thu Date: Thu, 14 Nov 2024 16:40:09 +0800 Subject: [PATCH] feat(sql): join by key & extract & add dummy test (#488) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 增加 join using key 的语法 增加系统函数:extract 增加叠加分片范围有重叠,但实际数据不重叠的测试 --- .../antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 | 2 +- .../logical/generator/QueryGenerator.java | 12 +- .../engine/logical/utils/OperatorUtils.java | 4 + .../naive/NaiveOperatorMemoryExecutor.java | 147 ++++++++++++------ .../engine/shared/function/FunctionUtils.java | 4 +- .../function/manager/FunctionManager.java | 2 + .../shared/function/system/Extract.java | 120 ++++++++++++++ .../engine/shared/operator/InnerJoin.java | 41 +++++ .../engine/shared/operator/OuterJoin.java | 39 +++++ .../tsinghua/iginx/sql/IginXSqlVisitor.java | 3 +- .../frompart/join/JoinCondition.java | 22 ++- .../FilterPushDownPathUnionJoinRule.java | 6 +- .../tsinghua/iginx/optimizer/TreeBuilder.java | 3 +- .../expansion/BaseCapacityExpansionIT.java | 51 +++++- .../mongodb/MongoDBCapacityExpansionIT.java | 36 +++++ .../mysql/MySQLCapacityExpansionIT.java | 36 +++++ .../PostgreSQLCapacityExpansionIT.java | 36 +++++ .../redis/RedisCapacityExpansionIT.java | 35 +++++ .../integration/func/sql/SQLSessionIT.java | 116 ++++++++++++++ 19 files changed, 643 insertions(+), 72 deletions(-) create mode 100644 core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/Extract.java diff --git a/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 b/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 index 3da70820a7..6fc07ac307 100644 --- a/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 +++ b/antlr/src/main/antlr4/cn/edu/tsinghua/iginx/sql/Sql.g4 @@ -279,7 +279,7 @@ fromClause joinPart : COMMA tableReference | CROSS JOIN tableReference - | join tableReference (ON orExpression | USING colList)? + | join tableReference (ON orExpression | USING (KEY | colList))? ; tableReference diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java index e2c5d7c963..01ca042260 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/generator/QueryGenerator.java @@ -481,6 +481,7 @@ private Operator initFilterAndMergeFragmentsWithJoin(UnarySelectStatement select Filter filter = joinCondition.getFilter(); List joinColumns = joinCondition.getJoinColumns(); boolean isNaturalJoin = JoinType.isNaturalJoin(joinCondition.getJoinType()); + boolean isJoinByKey = joinCondition.isJoinByKey(); if (!joinColumns.isEmpty() || isNaturalJoin) { if (prefixA == null || prefixB == null) { @@ -507,6 +508,7 @@ private Operator initFilterAndMergeFragmentsWithJoin(UnarySelectStatement select filter, joinColumns, isNaturalJoin, + isJoinByKey, joinAlgType); break; case LeftOuterJoin: @@ -527,6 +529,7 @@ private Operator initFilterAndMergeFragmentsWithJoin(UnarySelectStatement select filter, joinColumns, isNaturalJoin, + isJoinByKey, joinAlgType); break; default: @@ -727,7 +730,7 @@ private Operator buildAddSequence(UnarySelectStatement selectStatement, Operator * @return 添加了Reorder操作符的根节点 */ private static Operator buildReorder(UnarySelectStatement selectStatement, Operator root) { - boolean hasFuncWithArgs = + boolean hasUDFWithArgs = selectStatement.getExpressions().stream() .anyMatch( expression -> { @@ -735,13 +738,14 @@ private static Operator buildReorder(UnarySelectStatement selectStatement, Opera return false; } FuncExpression funcExpression = ((FuncExpression) expression); - return !funcExpression.getArgs().isEmpty() - || !funcExpression.getKvargs().isEmpty(); + return funcExpression.isPyUDF() + && (!funcExpression.getArgs().isEmpty() + || !funcExpression.getKvargs().isEmpty()); }); if (selectStatement.isLastFirst()) { root = new Reorder(new OperatorSource(root), Arrays.asList("path", "value")); - } else if (hasFuncWithArgs) { + } else if (hasUDFWithArgs) { root = new Reorder(new OperatorSource(root), new ArrayList<>(Collections.singletonList("*"))); } else { List order = new ArrayList<>(); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java index f953cb9d81..9747f1646f 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/logical/utils/OperatorUtils.java @@ -222,6 +222,7 @@ public static Operator translateApply(Operator root, List correlatedVari null, new ArrayList<>(), false, + false, JoinAlgType.HashJoin, correlatedVariables); } else { @@ -311,6 +312,7 @@ private static Operator pushDownApply(Operator root, List correlatedVari singleJoin.getFilter(), new ArrayList<>(), false, + false, singleJoin.getJoinAlgType(), singleJoin.getExtraJoinPrefix()); } @@ -374,6 +376,7 @@ private static Operator pushDownApply(Operator root, List correlatedVari new BoolFilter(true), new ArrayList<>(), false, + false, JoinAlgType.HashJoin, correlatedVariables); } @@ -563,6 +566,7 @@ private static Operator combineAdjacentSelectAndJoin(Select select) { select.getFilter(), new ArrayList<>(), false, + false, algType, crossJoin.getExtraJoinPrefix()); case InnerJoin: diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java index 23e6f6c3ff..9e111e24e9 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/physical/memory/execute/naive/NaiveOperatorMemoryExecutor.java @@ -53,6 +53,7 @@ import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; import cn.edu.tsinghua.iginx.engine.shared.data.read.RowStream; +import cn.edu.tsinghua.iginx.engine.shared.expr.KeyExpression; import cn.edu.tsinghua.iginx.engine.shared.function.*; import cn.edu.tsinghua.iginx.engine.shared.function.system.Max; import cn.edu.tsinghua.iginx.engine.shared.function.system.Min; @@ -660,57 +661,7 @@ private RowStream executeJoin(Join join, Table tableA, Table tableB) throws Phys } // 目前只支持使用时间戳和顺序 if (join.getJoinBy().equals(Constants.KEY)) { - // 检查时间戳 - if (!headerA.hasKey() || !headerB.hasKey()) { - throw new InvalidOperatorParameterException( - "row streams for join operator by key should have key."); - } - List newFields = new ArrayList<>(); - newFields.addAll(headerA.getFields()); - newFields.addAll(headerB.getFields()); - Header newHeader = new Header(Field.KEY, newFields); - List newRows = new ArrayList<>(); - - int index1 = 0, index2 = 0; - while (index1 < tableA.getRowSize() && index2 < tableB.getRowSize()) { - Row rowA = tableA.getRow(index1), rowB = tableB.getRow(index2); - Object[] values = new Object[newHeader.getFieldSize()]; - long timestamp; - if (rowA.getKey() == rowB.getKey()) { - timestamp = rowA.getKey(); - System.arraycopy(rowA.getValues(), 0, values, 0, headerA.getFieldSize()); - System.arraycopy( - rowB.getValues(), 0, values, headerA.getFieldSize(), headerB.getFieldSize()); - index1++; - index2++; - } else if (rowA.getKey() < rowB.getKey()) { - timestamp = rowA.getKey(); - System.arraycopy(rowA.getValues(), 0, values, 0, headerA.getFieldSize()); - index1++; - } else { - timestamp = rowB.getKey(); - System.arraycopy( - rowB.getValues(), 0, values, headerA.getFieldSize(), headerB.getFieldSize()); - index2++; - } - newRows.add(new Row(newHeader, timestamp, values)); - } - - for (; index1 < tableA.getRowSize(); index1++) { - Row rowA = tableA.getRow(index1); - Object[] values = new Object[newHeader.getFieldSize()]; - System.arraycopy(rowA.getValues(), 0, values, 0, headerA.getFieldSize()); - newRows.add(new Row(newHeader, rowA.getKey(), values)); - } - - for (; index2 < tableB.getRowSize(); index2++) { - Row rowB = tableB.getRow(index2); - Object[] values = new Object[newHeader.getFieldSize()]; - System.arraycopy( - rowB.getValues(), 0, values, headerA.getFieldSize(), headerB.getFieldSize()); - newRows.add(new Row(newHeader, rowB.getKey(), values)); - } - return new Table(newHeader, newRows); + return executeJoinByKey(tableA, tableB, true, true); } else if (join.getJoinBy().equals(Constants.ORDINAL)) { if (headerA.hasKey() || headerB.hasKey()) { throw new InvalidOperatorParameterException( @@ -778,6 +729,17 @@ private RowStream executeCrossJoin(CrossJoin crossJoin, Table tableA, Table tabl private RowStream executeInnerJoin(InnerJoin innerJoin, Table tableA, Table tableB) throws PhysicalException { + if (innerJoin.isJoinByKey()) { + Sort sortByKey = + new Sort( + EmptySource.EMPTY_SOURCE, + Collections.singletonList(new KeyExpression(KEY)), + Collections.singletonList(Sort.SortType.ASC)); + tableA = transformToTable(executeSort(sortByKey, tableA)); + tableB = transformToTable(executeSort(sortByKey, tableB)); + return executeJoinByKey(tableA, tableB, false, false); + } + switch (innerJoin.getJoinAlgType()) { case NestedLoopJoin: return executeNestedLoopInnerJoin(innerJoin, tableA, tableB); @@ -790,6 +752,76 @@ private RowStream executeInnerJoin(InnerJoin innerJoin, Table tableA, Table tabl } } + private RowStream executeJoinByKey(Table tableA, Table tableB, boolean isLeft, boolean isRight) + throws PhysicalException { + Header headerA = tableA.getHeader(); + Header headerB = tableB.getHeader(); + // 检查时间戳 + if (!headerA.hasKey() || !headerB.hasKey()) { + throw new InvalidOperatorParameterException( + "row streams for join operator by key should have key."); + } + List newFields = new ArrayList<>(); + newFields.addAll(headerA.getFields()); + newFields.addAll(headerB.getFields()); + Header newHeader = new Header(Field.KEY, newFields); + List newRows = new ArrayList<>(); + + int index1 = 0, index2 = 0; + while (index1 < tableA.getRowSize() && index2 < tableB.getRowSize()) { + Row rowA = tableA.getRow(index1), rowB = tableB.getRow(index2); + Object[] values = new Object[newHeader.getFieldSize()]; + long timestamp; + if (rowA.getKey() == rowB.getKey()) { + timestamp = rowA.getKey(); + System.arraycopy(rowA.getValues(), 0, values, 0, headerA.getFieldSize()); + System.arraycopy( + rowB.getValues(), 0, values, headerA.getFieldSize(), headerB.getFieldSize()); + index1++; + index2++; + } else if (rowA.getKey() < rowB.getKey()) { + index1++; + if (!isLeft) { // 内连接和右连接不保留该结果 + continue; + } + timestamp = rowA.getKey(); + System.arraycopy(rowA.getValues(), 0, values, 0, headerA.getFieldSize()); + } else { + index2++; + if (!isRight) { // 内连接和左连接不保留该结果 + continue; + } + timestamp = rowB.getKey(); + System.arraycopy( + rowB.getValues(), 0, values, headerA.getFieldSize(), headerB.getFieldSize()); + } + newRows.add(new Row(newHeader, timestamp, values)); + } + + // 左连接和全连接才保留该结果 + if (isLeft) { + for (; index1 < tableA.getRowSize(); index1++) { + Row rowA = tableA.getRow(index1); + Object[] values = new Object[newHeader.getFieldSize()]; + System.arraycopy(rowA.getValues(), 0, values, 0, headerA.getFieldSize()); + newRows.add(new Row(newHeader, rowA.getKey(), values)); + } + } + + // 右连接和全连接才保留该结果 + if (isRight) { + for (; index2 < tableB.getRowSize(); index2++) { + Row rowB = tableB.getRow(index2); + Object[] values = new Object[newHeader.getFieldSize()]; + System.arraycopy( + rowB.getValues(), 0, values, headerA.getFieldSize(), headerB.getFieldSize()); + newRows.add(new Row(newHeader, rowB.getKey(), values)); + } + } + + return new Table(newHeader, newRows); + } + private RowStream executeNestedLoopInnerJoin(InnerJoin innerJoin, Table tableA, Table tableB) throws PhysicalException { List joinColumns = new ArrayList<>(innerJoin.getJoinColumns()); @@ -1187,6 +1219,19 @@ private RowStream executeSortedMergeInnerJoin(InnerJoin innerJoin, Table tableA, private RowStream executeOuterJoin(OuterJoin outerJoin, Table tableA, Table tableB) throws PhysicalException { + if (outerJoin.isJoinByKey()) { + Sort sortByKey = + new Sort( + EmptySource.EMPTY_SOURCE, + Collections.singletonList(new KeyExpression(KEY)), + Collections.singletonList(Sort.SortType.ASC)); + tableA = transformToTable(executeSort(sortByKey, tableA)); + tableB = transformToTable(executeSort(sortByKey, tableB)); + boolean isLeft = outerJoin.getOuterJoinType() != OuterJoinType.RIGHT; + boolean isRight = outerJoin.getOuterJoinType() != OuterJoinType.LEFT; + return executeJoinByKey(tableA, tableB, isLeft, isRight); + } + switch (outerJoin.getJoinAlgType()) { case NestedLoopJoin: return executeNestedLoopOuterJoin(outerJoin, tableA, tableB); diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionUtils.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionUtils.java index 137eb624bc..023cc37f10 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionUtils.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/FunctionUtils.java @@ -53,7 +53,7 @@ public class FunctionUtils { private static final String VALUE = "value"; private static final Set sysRowToRowFunctionSet = - new HashSet<>(Arrays.asList("ratio", "substring")); + new HashSet<>(Arrays.asList("extract", "ratio", "substring")); private static final Set sysSetToRowFunctionSet = new HashSet<>( @@ -165,7 +165,6 @@ public static String getFunctionName(Function function) { static Map expectedParamNumMap = new HashMap<>(); // 此Map用于存储function期望的参数个数 - // TODO static { expectedParamNumMap.put("avg", 1); expectedParamNumMap.put("sum", 1); @@ -176,6 +175,7 @@ public static String getFunctionName(Function function) { expectedParamNumMap.put("last_value", 1); expectedParamNumMap.put("first", 1); expectedParamNumMap.put("last", 1); + expectedParamNumMap.put("extract", 1); expectedParamNumMap.put("ratio", 2); expectedParamNumMap.put("substring", 1); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/manager/FunctionManager.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/manager/FunctionManager.java index e04cf5a4f5..41a90f8d55 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/manager/FunctionManager.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/manager/FunctionManager.java @@ -28,6 +28,7 @@ import cn.edu.tsinghua.iginx.engine.shared.function.system.ArithmeticExpr; import cn.edu.tsinghua.iginx.engine.shared.function.system.Avg; import cn.edu.tsinghua.iginx.engine.shared.function.system.Count; +import cn.edu.tsinghua.iginx.engine.shared.function.system.Extract; import cn.edu.tsinghua.iginx.engine.shared.function.system.First; import cn.edu.tsinghua.iginx.engine.shared.function.system.FirstValue; import cn.edu.tsinghua.iginx.engine.shared.function.system.Last; @@ -100,6 +101,7 @@ private void initSystemFunctions() { registerFunction(Min.getInstance()); registerFunction(Sum.getInstance()); registerFunction(ArithmeticExpr.getInstance()); + registerFunction(Extract.getInstance()); registerFunction(Ratio.getInstance()); registerFunction(SubString.getInstance()); } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/Extract.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/Extract.java new file mode 100644 index 0000000000..96c6bbd4a9 --- /dev/null +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/function/system/Extract.java @@ -0,0 +1,120 @@ +/* + * IGinX - the polystore system with high performance + * Copyright (C) Tsinghua University + * TSIGinX@gmail.com + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 3 of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ +package cn.edu.tsinghua.iginx.engine.shared.function.system; + +import cn.edu.tsinghua.iginx.engine.shared.data.Value; +import cn.edu.tsinghua.iginx.engine.shared.data.read.Field; +import cn.edu.tsinghua.iginx.engine.shared.data.read.Header; +import cn.edu.tsinghua.iginx.engine.shared.data.read.Row; +import cn.edu.tsinghua.iginx.engine.shared.function.FunctionParams; +import cn.edu.tsinghua.iginx.engine.shared.function.FunctionType; +import cn.edu.tsinghua.iginx.engine.shared.function.MappingType; +import cn.edu.tsinghua.iginx.engine.shared.function.RowMappingFunction; +import cn.edu.tsinghua.iginx.thrift.DataType; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.Collections; +import java.util.TimeZone; + +public class Extract implements RowMappingFunction { + + public static final String EXTRACT = "extract"; + + private static final Extract INSTANCE = new Extract(); + + private Extract() {} + + public static Extract getInstance() { + return INSTANCE; + } + + @Override + public FunctionType getFunctionType() { + return FunctionType.System; + } + + @Override + public MappingType getMappingType() { + return MappingType.RowMapping; + } + + @Override + public String getIdentifier() { + return EXTRACT; + } + + @Override + public Row transform(Row row, FunctionParams params) throws Exception { + if (params.getPaths().size() != 1 || params.getArgs().size() != 1) { + throw new IllegalArgumentException("Unexpected params for extract."); + } + + String path = params.getPaths().get(0); + Value valueA = row.getAsValue(path); + if (valueA == null || valueA.isNull()) { + return Row.EMPTY_ROW; + } + if (valueA.getDataType() != DataType.LONG && valueA.getDataType() != DataType.INTEGER) { + throw new IllegalArgumentException("Unexpected data type for extract function."); + } + + if (!(params.getArgs().get(0) instanceof byte[])) { + throw new IllegalArgumentException("The 2nd arg 'field' for extract should be a string."); + } + + long timestamp = (long) valueA.getValue(); + Instant instant = Instant.ofEpochMilli(timestamp); + LocalDateTime localDateTime = + instant.atZone(TimeZone.getTimeZone("GMT").toZoneId()).toLocalDateTime(); + + int ret; + String field = new String((byte[]) params.getArgs().get(0)).toLowerCase(); + switch (field) { + case "year": + ret = localDateTime.getYear(); + break; + case "month": + ret = localDateTime.getMonthValue(); + break; + case "day": + ret = localDateTime.getDayOfMonth(); + break; + case "hour": + ret = localDateTime.getHour(); + break; + case "minute": + ret = localDateTime.getMinute(); + break; + case "second": + ret = localDateTime.getSecond(); + break; + default: + throw new IllegalArgumentException( + "The 2nd arg 'field' for extract is expected in [\"year\", \"month\", \"day\", \"hour\", \"minute\", \"second\"]."); + } + + Header newHeader = + new Header( + row.getHeader().getKey(), + Collections.singletonList( + new Field("extract(" + path + ", " + field + ")", DataType.INTEGER))); + return new Row(newHeader, row.getKey(), new Object[] {ret}); + } +} diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/InnerJoin.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/InnerJoin.java index c204bb1119..fe89f92566 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/InnerJoin.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/InnerJoin.java @@ -39,6 +39,8 @@ public class InnerJoin extends AbstractJoin { private final boolean isNaturalJoin; + private final boolean isJoinByKey; + public InnerJoin( Source sourceA, Source sourceB, @@ -66,6 +68,7 @@ public InnerJoin( tagFilter, joinColumns, false, + false, JoinAlgType.HashJoin, new ArrayList<>()); } @@ -87,6 +90,7 @@ public InnerJoin( null, joinColumns, isNaturalJoin, + false, JoinAlgType.HashJoin, new ArrayList<>()); } @@ -109,6 +113,31 @@ public InnerJoin( null, joinColumns, isNaturalJoin, + false, + joinAlgType, + new ArrayList<>()); + } + + public InnerJoin( + Source sourceA, + Source sourceB, + String prefixA, + String prefixB, + Filter filter, + List joinColumns, + boolean isNaturalJoin, + boolean isJoinByKey, + JoinAlgType joinAlgType) { + this( + sourceA, + sourceB, + prefixA, + prefixB, + filter, + null, + joinColumns, + isNaturalJoin, + isJoinByKey, joinAlgType, new ArrayList<>()); } @@ -122,6 +151,7 @@ public InnerJoin( TagFilter tagFilter, List joinColumns, boolean isNaturalJoin, + boolean isJoinByKey, JoinAlgType joinAlgType, List extraJoinPrefix) { super(OperatorType.InnerJoin, sourceA, sourceB, prefixA, prefixB, joinAlgType, extraJoinPrefix); @@ -132,6 +162,7 @@ public InnerJoin( this.joinColumns = new ArrayList<>(); } this.isNaturalJoin = isNaturalJoin; + this.isJoinByKey = isJoinByKey; this.tagFilter = tagFilter; } @@ -143,6 +174,7 @@ public InnerJoin( Filter filter, List joinColumns, boolean isNaturalJoin, + boolean isJoinByKey, JoinAlgType joinAlgType, List extraJoinPrefix) { this( @@ -154,6 +186,7 @@ public InnerJoin( null, joinColumns, isNaturalJoin, + isJoinByKey, joinAlgType, extraJoinPrefix); } @@ -170,6 +203,10 @@ public boolean isNaturalJoin() { return isNaturalJoin; } + public boolean isJoinByKey() { + return isJoinByKey; + } + public void setFilter(Filter filter) { this.filter = filter; } @@ -197,6 +234,7 @@ public Operator copy() { tagFilter == null ? null : tagFilter.copy(), new ArrayList<>(joinColumns), isNaturalJoin, + isJoinByKey, getJoinAlgType(), new ArrayList<>(getExtraJoinPrefix())); } @@ -212,12 +250,14 @@ public BinaryOperator copyWithSource(Source sourceA, Source sourceB) { tagFilter == null ? null : tagFilter.copy(), new ArrayList<>(joinColumns), isNaturalJoin, + isJoinByKey, getJoinAlgType(), new ArrayList<>(getExtraJoinPrefix())); } @Override public String getInfo() { + // TODO StringBuilder builder = new StringBuilder(); builder.append("PrefixA: ").append(getPrefixA()); builder.append(", PrefixB: ").append(getPrefixB()); @@ -252,6 +292,7 @@ public boolean equals(Object object) { } InnerJoin that = (InnerJoin) object; return isNaturalJoin == that.isNaturalJoin + && isJoinByKey == that.isJoinByKey && joinColumns.equals(that.joinColumns) && filter.equals(that.filter) && tagFilter.equals(that.tagFilter) diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/OuterJoin.java b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/OuterJoin.java index 540f06f106..0656fccac4 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/OuterJoin.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/engine/shared/operator/OuterJoin.java @@ -37,6 +37,8 @@ public class OuterJoin extends AbstractJoin { private final boolean isNaturalJoin; + private final boolean isJoinByKey; + public OuterJoin( Source sourceA, Source sourceB, @@ -54,6 +56,7 @@ public OuterJoin( filter, joinColumns, false, + false, JoinAlgType.HashJoin, new ArrayList<>()); } @@ -77,6 +80,32 @@ public OuterJoin( filter, joinColumns, isNaturalJoin, + false, + joinAlgType, + new ArrayList<>()); + } + + public OuterJoin( + Source sourceA, + Source sourceB, + String prefixA, + String prefixB, + OuterJoinType outerJoinType, + Filter filter, + List joinColumns, + boolean isNaturalJoin, + boolean isJoinByKey, + JoinAlgType joinAlgType) { + this( + sourceA, + sourceB, + prefixA, + prefixB, + outerJoinType, + filter, + joinColumns, + isNaturalJoin, + isJoinByKey, joinAlgType, new ArrayList<>()); } @@ -90,6 +119,7 @@ public OuterJoin( Filter filter, List joinColumns, boolean isNaturalJoin, + boolean isJoinByKey, JoinAlgType joinAlgType, List extraJoinPrefix) { super(OperatorType.OuterJoin, sourceA, sourceB, prefixA, prefixB, joinAlgType, extraJoinPrefix); @@ -101,6 +131,7 @@ public OuterJoin( this.joinColumns = new ArrayList<>(); } this.isNaturalJoin = isNaturalJoin; + this.isJoinByKey = isJoinByKey; } public OuterJoinType getOuterJoinType() { @@ -123,6 +154,10 @@ public boolean isNaturalJoin() { return isNaturalJoin; } + public boolean isJoinByKey() { + return isJoinByKey; + } + public void setFilter(Filter filter) { this.filter = filter; } @@ -143,6 +178,7 @@ public Operator copy() { filter.copy(), new ArrayList<>(joinColumns), isNaturalJoin, + isJoinByKey, getJoinAlgType(), new ArrayList<>(getExtraJoinPrefix())); } @@ -158,12 +194,14 @@ public BinaryOperator copyWithSource(Source sourceA, Source sourceB) { filter.copy(), new ArrayList<>(joinColumns), isNaturalJoin, + isJoinByKey, getJoinAlgType(), new ArrayList<>(getExtraJoinPrefix())); } @Override public String getInfo() { + // TODO StringBuilder builder = new StringBuilder(); builder.append("PrefixA: ").append(getPrefixA()); builder.append(", PrefixB: ").append(getPrefixB()); @@ -202,6 +240,7 @@ public boolean equals(Object object) { && filter.equals(that.filter) && joinColumns.equals(that.joinColumns) && isNaturalJoin == that.isNaturalJoin + && isJoinByKey == that.isJoinByKey && getExtraJoinPrefix().equals(that.getExtraJoinPrefix()); } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java b/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java index b4a8f19c9d..b6d5edee86 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/sql/IginXSqlVisitor.java @@ -718,7 +718,8 @@ private void parseFromParts(FromClauseContext ctx, UnarySelectStatement selectSt .path() .forEach(pathContext -> columns.add(parsePath(pathContext))); } - joinPart.setJoinCondition(new JoinCondition(joinType, filter, columns)); + boolean isJoinByKey = joinPartContext.KEY() != null; + joinPart.setJoinCondition(new JoinCondition(joinType, filter, columns, isJoinByKey)); fromParts.add(joinPart); } } diff --git a/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/frompart/join/JoinCondition.java b/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/frompart/join/JoinCondition.java index d271ab2367..a46a30cf95 100644 --- a/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/frompart/join/JoinCondition.java +++ b/core/src/main/java/cn/edu/tsinghua/iginx/sql/statement/frompart/join/JoinCondition.java @@ -30,21 +30,28 @@ public class JoinCondition { private final JoinType joinType; private final Filter filter; private final List joinColumns; + private final boolean isJoinByKey; private final String markColumn; private final boolean isAntiJoin; public JoinCondition() { - this(JoinType.CrossJoin, null, Collections.emptyList()); + this(JoinType.CrossJoin, null, Collections.emptyList(), false); } public JoinCondition(JoinType joinType, Filter filter) { - this(joinType, filter, Collections.emptyList()); + this(joinType, filter, Collections.emptyList(), false); } public JoinCondition(JoinType joinType, Filter filter, List joinColumns) { + this(joinType, filter, joinColumns, false); + } + + public JoinCondition( + JoinType joinType, Filter filter, List joinColumns, boolean isJoinByKey) { this.joinType = joinType; this.filter = filter; this.joinColumns = joinColumns; + this.isJoinByKey = isJoinByKey; this.markColumn = null; this.isAntiJoin = false; } @@ -53,6 +60,7 @@ public JoinCondition(JoinType joinType, Filter filter, String markColumn, boolea this.joinType = joinType; this.filter = filter; this.joinColumns = new ArrayList<>(); + this.isJoinByKey = false; this.markColumn = markColumn; this.isAntiJoin = isAntiJoin; } @@ -69,6 +77,10 @@ public List getJoinColumns() { return joinColumns; } + public boolean isJoinByKey() { + return isJoinByKey; + } + public String getMarkColumn() { return markColumn; } @@ -88,11 +100,13 @@ public boolean equals(Object o) { JoinCondition joinCondition = (JoinCondition) o; return joinType == joinCondition.joinType && Objects.equals(filter, joinCondition.filter) - && Objects.equals(joinColumns, joinCondition.joinColumns); + && Objects.equals(joinColumns, joinCondition.joinColumns) + && isJoinByKey == joinCondition.isJoinByKey + && isAntiJoin == joinCondition.isAntiJoin; } @Override public int hashCode() { - return Objects.hash(joinType, filter, joinColumns); + return Objects.hash(joinType, filter, joinColumns, isJoinByKey, isAntiJoin); } } diff --git a/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/FilterPushDownPathUnionJoinRule.java b/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/FilterPushDownPathUnionJoinRule.java index adee6fccdf..3c73d6478d 100644 --- a/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/FilterPushDownPathUnionJoinRule.java +++ b/optimizer/src/main/java/cn/edu/tsinghua/iginx/logical/optimizer/rules/FilterPushDownPathUnionJoinRule.java @@ -179,7 +179,7 @@ public void onMatch(RuleCall call) { new Select(operator.getSourceA(), new AndFilter(leftFilters), select.getTagFilter()); operator.setSourceA(new OperatorSource(leftSelect)); - if (operator.getType() == OperatorType.OuterJoin) { + if (operator.getType() == OperatorType.OuterJoin && !((OuterJoin) operator).isJoinByKey()) { OuterJoin outerJoin = (OuterJoin) operator; if (outerJoin.getOuterJoinType() == OuterJoinType.RIGHT) { operator = @@ -191,6 +191,7 @@ public void onMatch(RuleCall call) { outerJoin.getFilter(), outerJoin.getJoinColumns(), outerJoin.isNaturalJoin(), + outerJoin.isJoinByKey(), outerJoin.getJoinAlgType(), outerJoin.getExtraJoinPrefix()); } else if (outerJoin.getOuterJoinType() == OuterJoinType.FULL) { @@ -204,7 +205,7 @@ public void onMatch(RuleCall call) { new Select(operator.getSourceB(), new AndFilter(rightFilters), select.getTagFilter()); operator.setSourceB(new OperatorSource(rightSelect)); - if (operator.getType() == OperatorType.OuterJoin) { + if (operator.getType() == OperatorType.OuterJoin && !((OuterJoin) operator).isJoinByKey()) { OuterJoin outerJoin = (OuterJoin) operator; if (outerJoin.getOuterJoinType() == OuterJoinType.LEFT) { operator = @@ -216,6 +217,7 @@ public void onMatch(RuleCall call) { outerJoin.getFilter(), outerJoin.getJoinColumns(), outerJoin.isNaturalJoin(), + outerJoin.isJoinByKey(), outerJoin.getJoinAlgType(), outerJoin.getExtraJoinPrefix()); } else if (outerJoin.getOuterJoinType() == OuterJoinType.FULL) { diff --git a/optimizer/src/test/java/cn/edu/tsinghua/iginx/optimizer/TreeBuilder.java b/optimizer/src/test/java/cn/edu/tsinghua/iginx/optimizer/TreeBuilder.java index 0660ee3265..f5561eb1a0 100644 --- a/optimizer/src/test/java/cn/edu/tsinghua/iginx/optimizer/TreeBuilder.java +++ b/optimizer/src/test/java/cn/edu/tsinghua/iginx/optimizer/TreeBuilder.java @@ -97,7 +97,8 @@ public static Operator buildJoinTree1() { null, null, new PathFilter("test.a", Op.E, "test.b"), - null); + null, + false); Project projectC = new Project(EmptySource.EMPTY_SOURCE, Collections.emptyList(), null); OuterJoin outerJoin = diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java index 530718f4dd..483adf7579 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/BaseCapacityExpansionIT.java @@ -206,7 +206,7 @@ private void addStorageEngineInProgress( } @Test - public void oriHasDataExpHasData() { + public void oriHasDataExpHasData() throws SessionException { // 查询原始节点的历史数据,结果不为空 testQueryHistoryDataOriHasData(); // 写入并查询新数据 @@ -222,6 +222,8 @@ public void oriHasDataExpHasData() { testWriteAndQueryNewDataAfterCE(); // 测试插入相同数据后warning testSameKeyWarning(); + // 测试分片范围重叠,但数据不重叠 + testPathOverlappedDataNotOverlapped(); } @Test @@ -920,16 +922,19 @@ protected void testDummyKeyRange() { } private void testSameKeyWarning() { + if (!SUPPORT_KEY.get(testConf.getStorageType())) { + return; + } + try { session.executeSql( "insert into mn.wf01.wt01 (key, status) values (0, 123),(1, 123),(2, 123),(3, 123);"); String statement = "select * from mn.wf01.wt01;"; QueryDataSet res = session.executeQuery(statement); - if ((res.getWarningMsg() == null - || res.getWarningMsg().isEmpty() - || !res.getWarningMsg().contains("The query results contain overlapped keys.")) - && SUPPORT_KEY.get(testConf.getStorageType())) { + if (res.getWarningMsg() == null + || res.getWarningMsg().isEmpty() + || !res.getWarningMsg().contains("The query results contain overlapped keys.")) { LOGGER.error("未抛出重叠key的警告"); fail(); } @@ -937,7 +942,7 @@ private void testSameKeyWarning() { clearData(); res = session.executeQuery(statement); - if (res.getWarningMsg() != null && SUPPORT_KEY.get(testConf.getStorageType())) { + if (res.getWarningMsg() != null) { LOGGER.error("不应抛出重叠key的警告"); fail(); } @@ -946,6 +951,40 @@ private void testSameKeyWarning() { } } + protected void testPathOverlappedDataNotOverlapped() throws SessionException { + // before + String statement = "select status from mn.wf01.wt01;"; + String expected = + "ResultSets:\n" + + "+---+-------------------+\n" + + "|key|mn.wf01.wt01.status|\n" + + "+---+-------------------+\n" + + "| 0| 11111111|\n" + + "| 1| 22222222|\n" + + "+---+-------------------+\n" + + "Total line number = 2\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + + String insert = + "insert into mn.wf01.wt01 (key, status) values (10, 33333333), (100, 44444444);"; + session.executeSql(insert); + + // after + statement = "select status from mn.wf01.wt01;"; + expected = + "ResultSets:\n" + + "+---+-------------------+\n" + + "|key|mn.wf01.wt01.status|\n" + + "+---+-------------------+\n" + + "| 0| 11111111|\n" + + "| 1| 22222222|\n" + + "| 10| 33333333|\n" + + "|100| 44444444|\n" + + "+---+-------------------+\n" + + "Total line number = 4\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + } + protected void startStorageEngineWithIginx(int port, boolean hasData, boolean isReadOnly) { String scriptPath, iginxPath = ".github/scripts/iginx/iginx.sh"; String os = System.getProperty("os.name").toLowerCase(); diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java index 61fb8230c3..22505fe7da 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mongodb/MongoDBCapacityExpansionIT.java @@ -21,6 +21,7 @@ import static cn.edu.tsinghua.iginx.thrift.StorageEngineType.mongodb; +import cn.edu.tsinghua.iginx.exception.SessionException; import cn.edu.tsinghua.iginx.integration.controller.Controller; import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.constant.Constant; @@ -510,4 +511,39 @@ protected void shutdownDatabase(int port) { protected void startDatabase(int port) { shutOrRestart(port, false, "mongodb"); } + + @Override + protected void testPathOverlappedDataNotOverlapped() throws SessionException { + // before + String statement = "select status from mn.wf01.wt01;"; + String expected = + "ResultSets:\n" + + "+----------+-------------------+\n" + + "| key|mn.wf01.wt01.status|\n" + + "+----------+-------------------+\n" + + "|4294967296| 11111111|\n" + + "|8589934592| 22222222|\n" + + "+----------+-------------------+\n" + + "Total line number = 2\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + + String insert = + "insert into mn.wf01.wt01 (key, status) values (10, 33333333), (100, 44444444);"; + session.executeSql(insert); + + // after + statement = "select status from mn.wf01.wt01;"; + expected = + "ResultSets:\n" + + "+----------+-------------------+\n" + + "| key|mn.wf01.wt01.status|\n" + + "+----------+-------------------+\n" + + "| 10| 33333333|\n" + + "| 100| 44444444|\n" + + "|4294967296| 11111111|\n" + + "|8589934592| 22222222|\n" + + "+----------+-------------------+\n" + + "Total line number = 4\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + } } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mysql/MySQLCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mysql/MySQLCapacityExpansionIT.java index e660d93e54..46ee1f42bc 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mysql/MySQLCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/mysql/MySQLCapacityExpansionIT.java @@ -22,6 +22,7 @@ import static cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools.executeShellScript; import static org.junit.Assert.fail; +import cn.edu.tsinghua.iginx.exception.SessionException; import cn.edu.tsinghua.iginx.integration.controller.Controller; import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.constant.Constant; @@ -103,4 +104,39 @@ private void testFloatData() { valuesList = Arrays.asList(Arrays.asList(44.55F)); SQLTestTools.executeAndCompare(session, statement, pathList, valuesList); } + + @Override + protected void testPathOverlappedDataNotOverlapped() throws SessionException { + // before + String statement = "select status from mn.wf01.wt01;"; + String expected = + "ResultSets:\n" + + "+-----------------+-------------------+\n" + + "| key|mn.wf01.wt01.status|\n" + + "+-----------------+-------------------+\n" + + "|32690615153702352| 11111111|\n" + + "|33357770565002400| 22222222|\n" + + "+-----------------+-------------------+\n" + + "Total line number = 2\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + + String insert = + "insert into mn.wf01.wt01 (key, status) values (10, 33333333), (100, 44444444);"; + session.executeSql(insert); + + // after + statement = "select status from mn.wf01.wt01;"; + expected = + "ResultSets:\n" + + "+-----------------+-------------------+\n" + + "| key|mn.wf01.wt01.status|\n" + + "+-----------------+-------------------+\n" + + "| 10| 33333333|\n" + + "| 100| 44444444|\n" + + "|32690615153702352| 11111111|\n" + + "|33357770565002400| 22222222|\n" + + "+-----------------+-------------------+\n" + + "Total line number = 4\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + } } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java index b3fa51f3c8..51809c4892 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/postgresql/PostgreSQLCapacityExpansionIT.java @@ -22,6 +22,7 @@ import static cn.edu.tsinghua.iginx.integration.expansion.utils.SQLTestTools.executeShellScript; import static org.junit.Assert.fail; +import cn.edu.tsinghua.iginx.exception.SessionException; import cn.edu.tsinghua.iginx.integration.controller.Controller; import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.constant.Constant; @@ -193,4 +194,39 @@ private void changeParams(int port, String oldPw, String newPw) { fail("Fail to update postgresql params."); } } + + @Override + protected void testPathOverlappedDataNotOverlapped() throws SessionException { + // before + String statement = "select status from mn.wf01.wt01;"; + String expected = + "ResultSets:\n" + + "+-----------------+-------------------+\n" + + "| key|mn.wf01.wt01.status|\n" + + "+-----------------+-------------------+\n" + + "|32690615153702352| 11111111|\n" + + "|33357770565002400| 22222222|\n" + + "+-----------------+-------------------+\n" + + "Total line number = 2\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + + String insert = + "insert into mn.wf01.wt01 (key, status) values (10, 33333333), (100, 44444444);"; + session.executeSql(insert); + + // after + statement = "select status from mn.wf01.wt01;"; + expected = + "ResultSets:\n" + + "+-----------------+-------------------+\n" + + "| key|mn.wf01.wt01.status|\n" + + "+-----------------+-------------------+\n" + + "| 10| 33333333|\n" + + "| 100| 44444444|\n" + + "|32690615153702352| 11111111|\n" + + "|33357770565002400| 22222222|\n" + + "+-----------------+-------------------+\n" + + "Total line number = 4\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + } } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/redis/RedisCapacityExpansionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/redis/RedisCapacityExpansionIT.java index 224cf90180..3c16914160 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/redis/RedisCapacityExpansionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/expansion/redis/RedisCapacityExpansionIT.java @@ -21,6 +21,7 @@ import static cn.edu.tsinghua.iginx.thrift.StorageEngineType.redis; +import cn.edu.tsinghua.iginx.exception.SessionException; import cn.edu.tsinghua.iginx.integration.controller.Controller; import cn.edu.tsinghua.iginx.integration.expansion.BaseCapacityExpansionIT; import cn.edu.tsinghua.iginx.integration.expansion.constant.Constant; @@ -238,4 +239,38 @@ protected void shutdownDatabase(int port) { protected void startDatabase(int port) { shutOrRestart(port, false, "redis"); } + + protected void testPathOverlappedDataNotOverlapped() throws SessionException { + // before + String statement = "select status from mn.wf01.wt01;"; + String expected = + "ResultSets:\n" + + "+---+-------------------+\n" + + "|key|mn.wf01.wt01.status|\n" + + "+---+-------------------+\n" + + "| 0| 22222222|\n" + + "| 1| 11111111|\n" + + "+---+-------------------+\n" + + "Total line number = 2\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + + String insert = + "insert into mn.wf01.wt01 (key, status) values (10, \"33333333\"), (100, \"44444444\");"; + session.executeSql(insert); + + // after + statement = "select status from mn.wf01.wt01;"; + expected = + "ResultSets:\n" + + "+---+-------------------+\n" + + "|key|mn.wf01.wt01.status|\n" + + "+---+-------------------+\n" + + "| 0| 22222222|\n" + + "| 1| 11111111|\n" + + "| 10| 33333333|\n" + + "|100| 44444444|\n" + + "+---+-------------------+\n" + + "Total line number = 4\n"; + SQLTestTools.executeAndCompare(session, statement, expected); + } } diff --git a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java index 922d15486b..538d2be3f2 100644 --- a/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java +++ b/test/src/test/java/cn/edu/tsinghua/iginx/integration/func/sql/SQLSessionIT.java @@ -3305,6 +3305,98 @@ public void testJoin() { executor.executeAndCompare(statement, expected); } + @Test + public void testJoinByKey() { + String insert = + "insert into test1(key, a, b) values (0, 1, 1.5), (1, 2, 2.5), (3, 4, 4.5), (4, 5, 5.5);"; + executor.execute(insert); + + insert = + "insert into test2(key, a, b) values (0, 1, \"aaa\"), (2, 3, \"bbb\"), (4, 5, \"ccc\"), (6, 7, \"ddd\");"; + executor.execute(insert); + + String statement = "select * from test1 join test2 using key;"; + String expected = + "ResultSets:\n" + + "+---+-------+-------+-------+-------+\n" + + "|key|test1.a|test1.b|test2.a|test2.b|\n" + + "+---+-------+-------+-------+-------+\n" + + "| 0| 1| 1.5| 1| aaa|\n" + + "| 4| 5| 5.5| 5| ccc|\n" + + "+---+-------+-------+-------+-------+\n" + + "Total line number = 2\n"; + executor.executeAndCompare(statement, expected); + + statement = "select * from test1 left join test2 using key;"; + expected = + "ResultSets:\n" + + "+---+-------+-------+-------+-------+\n" + + "|key|test1.a|test1.b|test2.a|test2.b|\n" + + "+---+-------+-------+-------+-------+\n" + + "| 0| 1| 1.5| 1| aaa|\n" + + "| 1| 2| 2.5| null| null|\n" + + "| 3| 4| 4.5| null| null|\n" + + "| 4| 5| 5.5| 5| ccc|\n" + + "+---+-------+-------+-------+-------+\n" + + "Total line number = 4\n"; + executor.executeAndCompare(statement, expected); + + statement = "select * from test1 left join test2 using key where key > 2;"; + expected = + "ResultSets:\n" + + "+---+-------+-------+-------+-------+\n" + + "|key|test1.a|test1.b|test2.a|test2.b|\n" + + "+---+-------+-------+-------+-------+\n" + + "| 3| 4| 4.5| null| null|\n" + + "| 4| 5| 5.5| 5| ccc|\n" + + "+---+-------+-------+-------+-------+\n" + + "Total line number = 2\n"; + executor.executeAndCompare(statement, expected); + + statement = "select * from test1 right join test2 using key;"; + expected = + "ResultSets:\n" + + "+---+-------+-------+-------+-------+\n" + + "|key|test1.a|test1.b|test2.a|test2.b|\n" + + "+---+-------+-------+-------+-------+\n" + + "| 0| 1| 1.5| 1| aaa|\n" + + "| 2| null| null| 3| bbb|\n" + + "| 4| 5| 5.5| 5| ccc|\n" + + "| 6| null| null| 7| ddd|\n" + + "+---+-------+-------+-------+-------+\n" + + "Total line number = 4\n"; + executor.executeAndCompare(statement, expected); + + statement = "select * from test1 full join test2 using key;"; + expected = + "ResultSets:\n" + + "+---+-------+-------+-------+-------+\n" + + "|key|test1.a|test1.b|test2.a|test2.b|\n" + + "+---+-------+-------+-------+-------+\n" + + "| 0| 1| 1.5| 1| aaa|\n" + + "| 1| 2| 2.5| null| null|\n" + + "| 2| null| null| 3| bbb|\n" + + "| 3| 4| 4.5| null| null|\n" + + "| 4| 5| 5.5| 5| ccc|\n" + + "| 6| null| null| 7| ddd|\n" + + "+---+-------+-------+-------+-------+\n" + + "Total line number = 6\n"; + executor.executeAndCompare(statement, expected); + + statement = "select * from test1 full join test2 using key where key < 2 or key > 4;"; + expected = + "ResultSets:\n" + + "+---+-------+-------+-------+-------+\n" + + "|key|test1.a|test1.b|test2.a|test2.b|\n" + + "+---+-------+-------+-------+-------+\n" + + "| 0| 1| 1.5| 1| aaa|\n" + + "| 1| 2| 2.5| null| null|\n" + + "| 6| null| null| 7| ddd|\n" + + "+---+-------+-------+-------+-------+\n" + + "Total line number = 3\n"; + executor.executeAndCompare(statement, expected); + } + @Test public void testMultiJoin() { String insert = @@ -9213,4 +9305,28 @@ public void testTransformKeyColumn() { + "Total line number = 2\n"; executor.executeAndCompare(statement, expected); } + + @Test + public void testExtract() { + String insert = + "insert into t(key, a) values (0, 1700000000000), (1, 1705000000000), (2, 1710000000000), (3, 1715000000000), (4, 1720000000000);"; + executor.execute(insert); + + String statement = + "select extract(a, \"year\") as year, extract(a, \"month\") as month, extract(a, \"day\") as day, " + + "extract(a, \"hour\") as hour, extract(a, \"minute\") as minute, extract(a, \"second\") as second from t;"; + String expected = + "ResultSets:\n" + + "+---+----+-----+---+----+------+------+\n" + + "|key|year|month|day|hour|minute|second|\n" + + "+---+----+-----+---+----+------+------+\n" + + "| 0|2023| 11| 14| 22| 13| 20|\n" + + "| 1|2024| 1| 11| 19| 6| 40|\n" + + "| 2|2024| 3| 9| 16| 0| 0|\n" + + "| 3|2024| 5| 6| 12| 53| 20|\n" + + "| 4|2024| 7| 3| 9| 46| 40|\n" + + "+---+----+-----+---+----+------+------+\n" + + "Total line number = 5\n"; + executor.executeAndCompare(statement, expected); + } }