diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java b/dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
new file mode 100644
index 0000000000..5fc8dc24cb
--- /dev/null
+++ b/dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.calcite.rel.metadata;
+
+import org.apache.calcite.plan.RelOptTable;
+
+/**
+ * Modified based on calcite's source code org.apache.calcite.rel.metadata.RelColumnOrigin
+ *
+ * Modification point:
+ *
+ * - add transform field and related code.
+ *
+ *
+ * @description: RelColumnOrigin is a data structure describing one of the origins of an
+ * output column produced by a relational expression.
+ * @author: HamaWhite
+ */
+public class RelColumnOrigin {
+ // ~ Instance fields --------------------------------------------------------
+
+ private final RelOptTable originTable;
+
+ private final int iOriginColumn;
+
+ private final boolean isDerived;
+
+ /**
+ * Stores the expression for data conversion,
+ * which source table fields are transformed by which expression the target field
+ */
+ private String transform;
+
+ // ~ Constructors -----------------------------------------------------------
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ }
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, String transform) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ this.transform = transform;
+ }
+
+ // ~ Methods ----------------------------------------------------------------
+
+ /**
+ * Returns table of origin.
+ */
+ public RelOptTable getOriginTable() {
+ return originTable;
+ }
+
+ /**
+ * Returns the 0-based index of column in origin table; whether this ordinal
+ * is flattened or unflattened depends on whether UDT flattening has already
+ * been performed on the relational expression which produced this
+ * description.
+ */
+ public int getOriginColumnOrdinal() {
+ return iOriginColumn;
+ }
+
+ /**
+ * Consider the query select a+b as c, d as e from t
. The
+ * output column c has two origins (a and b), both of them derived. The
+ * output column d as one origin (c), which is not derived.
+ *
+ * @return false if value taken directly from column in origin table; true
+ * otherwise
+ */
+ public boolean isDerived() {
+ return isDerived;
+ }
+
+ public String getTransform() {
+ return transform;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RelColumnOrigin)) {
+ return false;
+ }
+ RelColumnOrigin other = (RelColumnOrigin) obj;
+ return originTable.getQualifiedName().equals(other.originTable.getQualifiedName())
+ && (iOriginColumn == other.iOriginColumn)
+ && (isDerived == other.isDerived);
+ }
+
+ @Override
+ public int hashCode() {
+ return originTable.getQualifiedName().hashCode() + iOriginColumn + (isDerived ? 313 : 0);
+ }
+}
diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java b/dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
index b8b0a7aed2..5c8aae002a 100644
--- a/dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
+++ b/dinky-client/dinky-client-1.14/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
@@ -36,7 +36,7 @@
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldAccess;
@@ -48,33 +48,47 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.BuiltInMethod;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Modified based on calcite's source code org.apache.calcite.rel.metadata.RelMdColumnOrigins
*
- * Modification point: 1. Support lookup join, add method getColumnOrigins(Snapshot
- * rel,RelMetadataQuery mq, int iOutputColumn) 2. Support watermark, add method
- * getColumnOrigins(SingleRel rel,RelMetadataQuery mq, int iOutputColumn) 3. Support table function,
- * add method getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) 4. Support
- * field AS LOCALTIMESTAMP, modify method getColumnOrigins(Calc rel, RelMetadataQuery mq, int
- * iOutputColumn) 5. Support CEP, add method getColumnOrigins(Match rel, RelMetadataQuery mq, int
- * iOutputColumn) 6. Support ROW_NUMBER(), add method getColumnOrigins(Window rel, RelMetadataQuery
- * mq, int iOutputColumn)*
+ *
Modification point:
+ *
+ * - Support lookup join, add method getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support watermark, add method getColumnOrigins(SingleRel rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support table function, add method getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support CEP, add method getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support transform, add method createDerivedColumnOrigins(Set inputSet, String transform, boolean originTransform), and related code
+ *
- Support field AS LOCALTIMESTAMP, modify method getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support PROCTIME() is the first filed, add method computeIndexWithOffset, used by getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
*
- * @description: RelMdColumnOrigins supplies a default implementation of {@link
- * RelMetadataQuery#getColumnOrigins} for the standard logical algebra.
- * @version: 1.0.0
+ * @description: RelMdColumnOrigins supplies a default implementation of {@link RelMetadataQuery#getColumnOrigins} for the standard logical algebra.
+ * @author: HamaWhite
*/
public class RelMdColumnOrigins implements MetadataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(RelMdColumnOrigins.class);
+
+ public static final String DELIMITER = ".";
+
public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.COLUMN_ORIGIN.method, new RelMdColumnOrigins());
@@ -98,10 +112,10 @@ public Set getColumnOrigins(Aggregate rel, RelMetadataQuery mq,
// Aggregate columns are derived from input columns
AggregateCall call = rel.getAggCallList().get(iOutputColumn - rel.getGroupCount());
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (Integer iInput : call.getArgList()) {
Set inputSet = mq.getColumnOrigins(rel.getInput(), iInput);
- inputSet = createDerivedColumnOrigins(inputSet);
+ inputSet = createDerivedColumnOrigins(inputSet, call.toString(), true);
if (inputSet != null) {
set.addAll(inputSet);
}
@@ -132,7 +146,9 @@ public Set getColumnOrigins(Join rel, RelMetadataQuery mq, int
return set;
}
- /** Support the field blood relationship of table function */
+ /**
+ * Support the field blood relationship of table function
+ */
public Set getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) {
List leftFieldList = rel.getLeft().getRowType().getFieldList();
@@ -142,68 +158,126 @@ public Set getColumnOrigins(Correlate rel, RelMetadataQuery mq,
if (iOutputColumn < nLeftColumns) {
set = mq.getColumnOrigins(rel.getLeft(), iOutputColumn);
} else {
- // get the field name of the left table configured in the Table Function on the right
- TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
- RexCall rexCall = (RexCall) tableFunctionScan.getCall();
- // support only one field in table function
- RexFieldAccess rexFieldAccess = (RexFieldAccess) rexCall.operands.get(0);
- String fieldName = rexFieldAccess.getField().getName();
-
- int leftFieldIndex = 0;
- for (int i = 0; i < nLeftColumns; i++) {
- if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
- leftFieldIndex = i;
- break;
+ if (rel.getRight() instanceof TableFunctionScan) {
+ // get the field name of the left table configured in the Table Function on the right
+ TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
+ RexCall rexCall = (RexCall) tableFunctionScan.getCall();
+ // support only one field in table function
+ RexFieldAccess rexFieldAccess =
+ (RexFieldAccess) rexCall.getOperands().get(0);
+ String fieldName = rexFieldAccess.getField().getName();
+
+ int leftFieldIndex = 0;
+ for (int i = 0; i < nLeftColumns; i++) {
+ if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
+ leftFieldIndex = i;
+ break;
+ }
}
+ /**
+ * Get the fields from the left table, don't go to
+ * getColumnOrigins(TableFunctionScan rel,RelMetadataQuery mq, int iOutputColumn),
+ * otherwise the return is null, and the UDTF field origin cannot be parsed
+ */
+ set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
+
+ // process transform for udtf
+ String transform = rexCall.toString().replace(rexFieldAccess.toString(), fieldName)
+ + DELIMITER
+ + tableFunctionScan.getRowType().getFieldNames().get(iOutputColumn - nLeftColumns);
+ set = createDerivedColumnOrigins(set, transform, false);
+ } else {
+ set = mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns);
}
- /**
- * Get the fields from the left table, don't go to getColumnOrigins(TableFunctionScan
- * rel,RelMetadataQuery mq, int iOutputColumn), otherwise the return is null, and the
- * UDTF field origin cannot be parsed
- */
- set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
}
return set;
}
public Set getColumnOrigins(SetOp rel, RelMetadataQuery mq, int iOutputColumn) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (RelNode input : rel.getInputs()) {
Set inputSet = mq.getColumnOrigins(input, iOutputColumn);
if (inputSet == null) {
- return null;
+ return Collections.emptySet();
}
set.addAll(inputSet);
}
return set;
}
- /** Support the field blood relationship of lookup join */
+ /**
+ * Support the field blood relationship of lookup join
+ */
public Set getColumnOrigins(Snapshot rel, RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
- /** Support the field blood relationship of watermark */
+ /**
+ * Support the field blood relationship of watermark
+ */
public Set getColumnOrigins(SingleRel rel, RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
/**
- * Support field blood relationship of CEP. The first column is the field after PARTITION BY,
- * and the other columns come from the measures in Match
+ * Support for new fields in the source table similar to those created with the LOCALTIMESTAMP function
+ */
+ public Set getColumnOrigins(Project rel, final RelMetadataQuery mq, int iOutputColumn) {
+ final RelNode input = rel.getInput();
+ RexNode rexNode = rel.getProjects().get(iOutputColumn);
+
+ if (rexNode instanceof RexInputRef) {
+ // Direct reference: no derivation added.
+ RexInputRef inputRef = (RexInputRef) rexNode;
+ int index = inputRef.getIndex();
+ if (input instanceof TableScan) {
+ index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
+ }
+ return mq.getColumnOrigins(input, index);
+ } else if (input instanceof TableScan
+ && rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty()) {
+ return mq.getColumnOrigins(input, iOutputColumn);
+ }
+ // Anything else is a derivation, possibly from multiple columns.
+ final Set set = getMultipleColumns(rexNode, input, mq);
+ return createDerivedColumnOrigins(set, rexNode.toString(), true);
+ }
+
+ private int computeIndexWithOffset(List projects, int baseIndex, int iOutputColumn) {
+ int offset = 0;
+ for (int index = 0; index < iOutputColumn; index++) {
+ RexNode rexNode = projects.get(index);
+ if ((rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty())) {
+ offset += 1;
+ }
+ }
+ return baseIndex + offset;
+ }
+
+ /**
+ * Support field blood relationship of CEP.
+ * The first column is the field after PARTITION BY, and the other columns come from the measures in Match
*/
public Set getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn) {
- if (iOutputColumn == 0) {
+ int orderCount = rel.getOrderKeys().getKeys().size();
+
+ if (iOutputColumn < orderCount) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
final RelNode input = rel.getInput();
- RexNode rexNode = rel.getMeasures().values().asList().get(iOutputColumn - 1);
+ RexNode rexNode = rel.getMeasures().values().asList().get(iOutputColumn - orderCount);
RexPatternFieldRef rexPatternFieldRef = searchRexPatternFieldRef(rexNode);
if (rexPatternFieldRef != null) {
- return mq.getColumnOrigins(input, rexPatternFieldRef.getIndex());
+ final Set set = mq.getColumnOrigins(input, rexPatternFieldRef.getIndex());
+ String originTransform = rexNode instanceof RexCall
+ ? ((RexCall) rexNode).getOperands().get(0).toString()
+ : null;
+ return createDerivedColumnOrigins(set, originTransform, true);
}
- return null;
+ return Collections.emptySet();
}
private RexPatternFieldRef searchRexPatternFieldRef(RexNode rexNode) {
@@ -219,46 +293,6 @@ private RexPatternFieldRef searchRexPatternFieldRef(RexNode rexNode) {
return null;
}
- /** Support the field blood relationship of ROW_NUMBER() */
- public Set getColumnOrigins(Window rel, RelMetadataQuery mq, int iOutputColumn) {
- final RelNode input = rel.getInput();
- /**
- * Haven't found a good way to judge whether the field comes from window, for the time
- * being, first judge by parsing the string
- */
- String fieldName = rel.getRowType().getFieldNames().get(iOutputColumn);
- // for example: "w1$o0"
- if (fieldName.startsWith("w") && fieldName.contains("$")) {
- int groupIndex = Integer.parseInt(fieldName.substring(1, fieldName.indexOf("$")));
- final Set set = new LinkedHashSet<>();
- if (!rel.groups.isEmpty()) {
- Window.Group group = rel.groups.get(groupIndex);
- // process partition by keys
- group.keys.asList().forEach(index -> set.addAll(mq.getColumnOrigins(input, index)));
- // process order by keys
- group.orderKeys
- .getFieldCollations()
- .forEach(e -> set.addAll(mq.getColumnOrigins(input, e.getFieldIndex())));
- }
- return set;
- }
- return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
- }
-
- public Set getColumnOrigins(Project rel, final RelMetadataQuery mq, int iOutputColumn) {
- final RelNode input = rel.getInput();
- RexNode rexNode = rel.getProjects().get(iOutputColumn);
-
- if (rexNode instanceof RexInputRef) {
- // Direct reference: no derivation added.
- RexInputRef inputRef = (RexInputRef) rexNode;
- return mq.getColumnOrigins(input, inputRef.getIndex());
- }
- // Anything else is a derivation, possibly from multiple columns.
- final Set set = getMultipleColumns(rexNode, input, mq);
- return createDerivedColumnOrigins(set);
- }
-
public Set getColumnOrigins(Calc rel, final RelMetadataQuery mq, int iOutputColumn) {
final RelNode input = rel.getInput();
final RexShuttle rexShuttle = new RexShuttle() {
@@ -277,30 +311,6 @@ public RexNode visitLocalRef(RexLocalRef localRef) {
// Direct reference: no derivation added.
RexInputRef inputRef = (RexInputRef) rexNode;
return mq.getColumnOrigins(input, inputRef.getIndex());
- } else if (rexNode instanceof RexCall && ((RexCall) rexNode).operands.isEmpty()) {
- // support for new fields in the source table similar to those created with the
- // LOCALTIMESTAMP function
- TableSourceTable table = ((TableSourceTable) rel.getInput().getTable());
- if (table != null) {
- String targetFieldName = rel.getProgram()
- .getOutputRowType()
- .getFieldList()
- .get(iOutputColumn)
- .getName();
- List fieldList =
- table.catalogTable().getResolvedSchema().getColumnNames();
-
- int index = -1;
- for (int i = 0; i < fieldList.size(); i++) {
- if (fieldList.get(i).equalsIgnoreCase(targetFieldName)) {
- index = i;
- break;
- }
- }
- if (index != -1) {
- return Collections.singleton(new RelColumnOrigin(table, index, false));
- }
- }
}
// Anything else is a derivation, possibly from multiple columns.
final Set set = getMultipleColumns(rexNode, input, mq);
@@ -324,14 +334,14 @@ public Set getColumnOrigins(Exchange rel, RelMetadataQuery mq,
}
public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQuery mq, int iOutputColumn) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
Set mappings = rel.getColumnMappings();
if (mappings == null) {
- if (rel.getInputs().size() > 0) {
+ if (!rel.getInputs().isEmpty()) {
// This is a non-leaf transformation: say we don't
// know about origins, because there are probably
// columns below.
- return null;
+ return Collections.emptySet();
} else {
// This is a leaf transformation: say there are fer sure no
// column origins.
@@ -346,7 +356,7 @@ public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQ
final int column = mapping.iInputColumn;
Set origins = mq.getColumnOrigins(input, column);
if (origins == null) {
- return null;
+ return Collections.emptySet();
}
if (mapping.derived) {
origins = createDerivedColumnOrigins(origins);
@@ -357,18 +367,19 @@ public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQ
}
// Catch-all rule when none of the others apply.
+ @SuppressWarnings("squid:S1172")
public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, int iOutputColumn) {
// NOTE jvs 28-Mar-2006: We may get this wrong for a physical table
// expression which supports projections. In that case,
// it's up to the plugin writer to override with the
// correct information.
- if (rel.getInputs().size() > 0) {
+ if (!rel.getInputs().isEmpty()) {
// No generic logic available for non-leaf rels.
- return null;
+ return Collections.emptySet();
}
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
RelOptTable table = rel.getTable();
if (table == null) {
@@ -383,7 +394,7 @@ public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, i
// names.) This detection assumes the table expression doesn't handle
// rename as well.
if (table.getRowType() != rel.getRowType()) {
- return null;
+ return Collections.emptySet();
}
set.add(new RelColumnOrigin(table, iOutputColumn, false));
@@ -392,9 +403,9 @@ public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, i
private Set createDerivedColumnOrigins(Set inputSet) {
if (inputSet == null) {
- return null;
+ return Collections.emptySet();
}
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (RelColumnOrigin rco : inputSet) {
RelColumnOrigin derived = new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true);
set.add(derived);
@@ -402,10 +413,113 @@ private Set createDerivedColumnOrigins(Set inp
return set;
}
+ private Set createDerivedColumnOrigins(
+ Set inputSet, String transform, boolean originTransform) {
+ if (inputSet == null || inputSet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ final Set set = new LinkedHashSet<>();
+
+ String finalTransform = originTransform ? computeTransform(inputSet, transform) : transform;
+ for (RelColumnOrigin rco : inputSet) {
+ RelColumnOrigin derived =
+ new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true, finalTransform);
+ set.add(derived);
+ }
+ return set;
+ }
+
+ /**
+ * Replace the variable at the beginning of $ in input with the real field information
+ */
+ private String computeTransform(Set inputSet, String transform) {
+ LOG.debug("origin transform: {}", transform);
+ Pattern pattern = Pattern.compile("\\$\\d+");
+ Matcher matcher = pattern.matcher(transform);
+
+ Set operandSet = new LinkedHashSet<>();
+ while (matcher.find()) {
+ operandSet.add(matcher.group());
+ }
+
+ if (operandSet.isEmpty()) {
+ LOG.info("operandSet is empty");
+ return null;
+ }
+ if (inputSet.size() != operandSet.size()) {
+ LOG.warn(
+ "The number [{}] of fields in the source tables are not equal to operands [{}]",
+ inputSet.size(),
+ operandSet.size());
+ return null;
+ }
+
+ Map sourceColumnMap = new HashMap<>();
+ Iterator iterator = optimizeSourceColumnSet(inputSet).iterator();
+ operandSet.forEach(e -> sourceColumnMap.put(e, iterator.next()));
+ LOG.debug("sourceColumnMap: {}", sourceColumnMap);
+
+ matcher = pattern.matcher(transform);
+ String temp;
+ while (matcher.find()) {
+ temp = matcher.group();
+ transform = transform.replace(temp, sourceColumnMap.get(temp));
+ }
+
+ // temporary special treatment
+ transform = transform.replace("_UTF-16LE", "");
+ LOG.debug("transform: {}", transform);
+ return transform;
+ }
+
+ /**
+ * Increase the readability of transform.
+ * if catalog, database and table are the same, return field.
+ * If the catalog and database are the same, return the table and field.
+ * If the catalog is the same, return the database, table, field.
+ * Otherwise, return all
+ */
+ private Set optimizeSourceColumnSet(Set inputSet) {
+ Set catalogSet = new HashSet<>();
+ Set databaseSet = new HashSet<>();
+ Set tableSet = new HashSet<>();
+ Set> qualifiedSet = new LinkedHashSet<>();
+ for (RelColumnOrigin rco : inputSet) {
+ RelOptTable originTable = rco.getOriginTable();
+ List qualifiedName = originTable.getQualifiedName();
+
+ // catalog,database,table,field
+ List qualifiedList = new ArrayList<>(qualifiedName);
+ catalogSet.add(qualifiedName.get(0));
+ databaseSet.add(qualifiedName.get(1));
+ tableSet.add(qualifiedName.get(2));
+
+ String field = rco.getTransform() != null
+ ? rco.getTransform()
+ : originTable.getRowType().getFieldNames().get(rco.getOriginColumnOrdinal());
+ qualifiedList.add(field);
+ qualifiedSet.add(qualifiedList);
+ }
+ if (catalogSet.size() == 1 && databaseSet.size() == 1 && tableSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> e.get(3));
+ } else if (catalogSet.size() == 1 && databaseSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(2, 4)));
+ } else if (catalogSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(1, 4)));
+ } else {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e));
+ }
+ }
+
+ private Set optimizeName(Set> qualifiedSet, Function, String> mapper) {
+ return qualifiedSet.stream().map(mapper).collect(Collectors.toSet());
+ }
+
private Set getMultipleColumns(RexNode rexNode, RelNode input, final RelMetadataQuery mq) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
final RexVisitor visitor = new RexVisitorImpl(true) {
+ @Override
public Void visitInputRef(RexInputRef inputRef) {
Set inputSet = mq.getColumnOrigins(input, inputRef.getIndex());
if (inputSet != null) {
diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
index be1682c8a4..022897a9ab 100644
--- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
+++ b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
@@ -22,7 +22,6 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
-import org.dinky.utils.FlinkStreamProgramWithoutPhysical;
import org.dinky.utils.LineageContext;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -68,7 +67,6 @@
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.delegation.DefaultExecutor;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.typeutils.FieldInfoUtils;
import org.apache.flink.types.Row;
@@ -94,8 +92,6 @@
*/
public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {
- private final FlinkChainedProgram flinkChainedProgram;
-
public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
@@ -117,8 +113,6 @@ public CustomTableEnvironmentImpl(
isStreamingMode,
userClassLoader));
this.executor = executor;
- this.flinkChainedProgram =
- FlinkStreamProgramWithoutPhysical.buildProgram((Configuration) executionEnvironment.getConfiguration());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
@@ -365,8 +359,7 @@ public void createTemporaryView(String path, DataStream dataStream, Strin
@Override
public List getLineage(String statement) {
- LineageContext lineageContext =
- new LineageContext(flinkChainedProgram, (TableEnvironmentImpl) streamTableEnvironment);
+ LineageContext lineageContext = new LineageContext((TableEnvironmentImpl) streamTableEnvironment);
return lineageContext.getLineage(statement);
}
diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java
deleted file mode 100644
index 60755f0700..0000000000
--- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.dinky.utils;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.hep.HepMatchOrder;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
-import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets;
-
-/**
- * FlinkStreamProgramWithoutPhysical
- *
- * @since 2022/8/20 23:33
- */
-public class FlinkStreamProgramWithoutPhysical {
-
- private static final String SUBQUERY_REWRITE = "subquery_rewrite";
- private static final String TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite";
- private static final String DECORRELATE = "decorrelate";
- private static final String DEFAULT_REWRITE = "default_rewrite";
- private static final String PREDICATE_PUSHDOWN = "predicate_pushdown";
- private static final String JOIN_REORDER = "join_reorder";
- private static final String PROJECT_REWRITE = "project_rewrite";
- private static final String LOGICAL = "logical";
- private static final String LOGICAL_REWRITE = "logical_rewrite";
-
- public static FlinkChainedProgram buildProgram(Configuration config) {
- FlinkChainedProgram chainedProgram = new FlinkChainedProgram();
-
- // rewrite sub-queries to joins
- chainedProgram.addLast(
- SUBQUERY_REWRITE,
- FlinkGroupProgramBuilder.newBuilder()
- // rewrite QueryOperationCatalogViewTable before rewriting sub-queries
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_REF_RULES())
- .build(),
- "convert table references before rewriting sub-queries to" + " semi-join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.SEMI_JOIN_RULES())
- .build(),
- "rewrite sub-queries to semi-join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES())
- .build(),
- "sub-queries remove")
- // convert RelOptTableImpl (which exists in SubQuery before) to
- // FlinkRelOptTable
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_REF_RULES())
- .build(),
- "convert table references after sub-queries removed")
- .build());
-
- // rewrite special temporal join plan
- chainedProgram.addLast(
- TEMPORAL_JOIN_REWRITE,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.EXPAND_PLAN_RULES())
- .build(),
- "convert correlate to temporal table join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.POST_EXPAND_CLEAN_UP_RULES())
- .build(),
- "convert enumerable table scan")
- .build());
-
- // query decorrelation
- chainedProgram.addLast(
- DECORRELATE,
- FlinkGroupProgramBuilder.newBuilder()
- // rewrite before decorrelation
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PRE_DECORRELATION_RULES())
- .build(),
- "pre-rewrite before decorrelation")
- .addProgram(new FlinkDecorrelateProgram(), "")
- .build());
-
- // default rewrite, includes: predicate simplification, expression reduction, window
- // properties rewrite, etc.
- chainedProgram.addLast(
- DEFAULT_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.DEFAULT_REWRITE_RULES())
- .build());
-
- // rule based optimization: push down predicate(s) in where clause, so it only needs to read
- // the required data
- chainedProgram.addLast(
- PREDICATE_PUSHDOWN,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES())
- .build(),
- "filter rules")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.FILTER_TABLESCAN_PUSHDOWN_RULES())
- .build(),
- "push predicate into table scan")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PRUNE_EMPTY_RULES())
- .build(),
- "prune empty after predicate push down")
- .build());
-
- // join reorder
- if (config.getBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
- chainedProgram.addLast(
- JOIN_REORDER,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_REORDER_PREPARE_RULES())
- .build(),
- "merge join into MultiJoin")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_REORDER_RULES())
- .build(),
- "do join reorder")
- .build());
- }
-
- // project rewrite
- chainedProgram.addLast(
- PROJECT_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PROJECT_RULES())
- .build());
-
- // optimize the logical plan
- chainedProgram.addLast(
- LOGICAL,
- FlinkVolcanoProgramBuilder.newBuilder()
- .add(FlinkStreamRuleSets.LOGICAL_OPT_RULES())
- .setRequiredOutputTraits(new Convention.Impl[] {FlinkConventions.LOGICAL()})
- .build());
-
- // logical rewrite
- chainedProgram.addLast(
- LOGICAL_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.LOGICAL_REWRITE())
- .build());
-
- return chainedProgram;
- }
-}
diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/LineageContext.java b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/LineageContext.java
index d8ca8d6944..577c515bc7 100644
--- a/dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/LineageContext.java
+++ b/dinky-client/dinky-client-1.14/src/main/java/org/dinky/utils/LineageContext.java
@@ -27,22 +27,13 @@
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
-import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
-import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.trait.MiniBatchInterval;
import java.util.ArrayList;
import java.util.List;
@@ -55,11 +46,9 @@
*/
public class LineageContext {
- private final FlinkChainedProgram flinkChainedProgram;
private final TableEnvironmentImpl tableEnv;
- public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentImpl tableEnv) {
- this.flinkChainedProgram = flinkChainedProgram;
+ public LineageContext(TableEnvironmentImpl tableEnv) {
this.tableEnv = tableEnv;
}
@@ -69,11 +58,8 @@ public List getLineage(String statement) {
String sinkTable = parsed.getField(0);
RelNode oriRelNode = parsed.getField(1);
- // 2. Optimize original relNode to generate Optimized Logical Plan
- RelNode optRelNode = optimize(oriRelNode);
-
- // 3. Build lineage based from RelMetadataQuery
- return buildFiledLineageResult(sinkTable, optRelNode);
+ // 2. Build lineage based from RelMetadataQuery
+ return buildFiledLineageResult(sinkTable, oriRelNode);
}
private Tuple2 parseStatement(String sql) {
@@ -94,66 +80,6 @@ private Tuple2 parseStatement(String sql) {
}
}
- /** Calling each program's optimize method in sequence. */
- private RelNode optimize(RelNode relNode) {
- return flinkChainedProgram.optimize(relNode, new StreamOptimizeContext() {
-
- @Override
- public boolean isBatchMode() {
- return false;
- }
-
- @Override
- public TableConfig getTableConfig() {
- return tableEnv.getConfig();
- }
-
- @Override
- public FunctionCatalog getFunctionCatalog() {
- return getPlanner().getFlinkContext().getFunctionCatalog();
- }
-
- @Override
- public CatalogManager getCatalogManager() {
- return tableEnv.getCatalogManager();
- }
-
- @Override
- public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() {
- return getPlanner().getFlinkContext().getSqlExprToRexConverterFactory();
- }
-
- @Override
- public C unwrap(Class clazz) {
- return getPlanner().getFlinkContext().unwrap(clazz);
- }
-
- @Override
- public FlinkRelBuilder getFlinkRelBuilder() {
- return getPlanner().getRelBuilder();
- }
-
- @Override
- public boolean needFinalTimeIndicatorConversion() {
- return true;
- }
-
- @Override
- public boolean isUpdateBeforeRequired() {
- return false;
- }
-
- @Override
- public MiniBatchInterval getMiniBatchInterval() {
- return MiniBatchInterval.NONE;
- }
-
- private PlannerBase getPlanner() {
- return (PlannerBase) tableEnv.getPlanner();
- }
- });
- }
-
/** Check the size of query and sink fields match */
private void validateSchema(String sinkTable, RelNode relNode, List sinkFieldList) {
List queryFieldList = relNode.getRowType().getFieldNames();
@@ -197,7 +123,8 @@ private List buildFiledLineageResult(String sinkTable, RelNode optRe
String sourceColumn = fieldNames.get(ordinal);
// add record
- resultList.add(LineageRel.build(sourceTable, sourceColumn, sinkTable, targetColumn));
+ resultList.add(LineageRel.build(
+ sourceTable, sourceColumn, sinkTable, targetColumn, relColumnOrigin.getTransform()));
}
}
}
diff --git a/dinky-client/dinky-client-1.14/src/test/java/org/dinky/utils/LineageContextTest.java b/dinky-client/dinky-client-1.14/src/test/java/org/dinky/utils/LineageContextTest.java
new file mode 100644
index 0000000000..fa22d6eec6
--- /dev/null
+++ b/dinky-client/dinky-client-1.14/src/test/java/org/dinky/utils/LineageContextTest.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.dinky.data.model.LineageRel;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * @description: LineageContextTest
+ * @author: HamaWhite
+ */
+public class LineageContextTest {
+
+ private static TableEnvironmentImpl tableEnv;
+ private static LineageContext context;
+
+ @BeforeClass
+ public static void setUp() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
+
+ EnvironmentSettings settings =
+ EnvironmentSettings.newInstance().inStreamingMode().build();
+ tableEnv = (TableEnvironmentImpl) StreamTableEnvironment.create(env, settings);
+
+ context = new LineageContext(tableEnv);
+ }
+
+ @Before
+ public void init() {
+ // create table ST
+ tableEnv.executeSql("DROP TABLE IF EXISTS ST");
+ tableEnv.executeSql("CREATE TABLE ST ( " + " a STRING ,"
+ + " b STRING ,"
+ + " c STRING "
+ + ") WITH ( "
+ + " 'connector' = 'datagen' ,"
+ + " 'rows-per-second' = '1' "
+ + ")");
+
+ // create table TT
+ tableEnv.executeSql("DROP TABLE IF EXISTS TT");
+ tableEnv.executeSql("CREATE TABLE TT ( " + " A STRING ,"
+ + " B STRING "
+ + ") WITH ( "
+ + " 'connector' = 'print' "
+ + ")");
+ }
+
+ @Test
+ public void testGetLineage() {
+ List actualList = context.getLineage("INSERT INTO TT select a||c A ,b||c B from ST");
+ String[][] expectedArray = {
+ {"ST", "a", "TT", "A", "||(a, c)"},
+ {"ST", "c", "TT", "A", "||(a, c)"},
+ {"ST", "b", "TT", "B", "||(b, c)"},
+ {"ST", "c", "TT", "B", "||(b, c)"}
+ };
+
+ List expectedList = buildResult(expectedArray);
+ assertEquals(expectedList, actualList);
+ }
+
+ private List buildResult(String[][] expectedArray) {
+ return Stream.of(expectedArray)
+ .map(e -> {
+ String transform = e.length == 5 ? e[4] : null;
+ return new LineageRel(
+ "default_catalog",
+ "default_database",
+ e[0],
+ e[1],
+ "default_catalog",
+ "default_database",
+ e[2],
+ e[3],
+ transform);
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git a/dinky-client/dinky-client-1.15/pom.xml b/dinky-client/dinky-client-1.15/pom.xml
index d614979679..003f87ff90 100644
--- a/dinky-client/dinky-client-1.15/pom.xml
+++ b/dinky-client/dinky-client-1.15/pom.xml
@@ -67,6 +67,11 @@
activation
1.1.1
+
+ junit
+ junit
+ test
+
diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java b/dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
new file mode 100644
index 0000000000..5fc8dc24cb
--- /dev/null
+++ b/dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.calcite.rel.metadata;
+
+import org.apache.calcite.plan.RelOptTable;
+
+/**
+ * Modified based on calcite's source code org.apache.calcite.rel.metadata.RelColumnOrigin
+ *
+ * Modification point:
+ *
+ * - add transform field and related code.
+ *
+ *
+ * @description: RelColumnOrigin is a data structure describing one of the origins of an
+ * output column produced by a relational expression.
+ * @author: HamaWhite
+ */
+public class RelColumnOrigin {
+ // ~ Instance fields --------------------------------------------------------
+
+ private final RelOptTable originTable;
+
+ private final int iOriginColumn;
+
+ private final boolean isDerived;
+
+ /**
+ * Stores the expression for data conversion,
+ * which source table fields are transformed by which expression the target field
+ */
+ private String transform;
+
+ // ~ Constructors -----------------------------------------------------------
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ }
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, String transform) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ this.transform = transform;
+ }
+
+ // ~ Methods ----------------------------------------------------------------
+
+ /**
+ * Returns table of origin.
+ */
+ public RelOptTable getOriginTable() {
+ return originTable;
+ }
+
+ /**
+ * Returns the 0-based index of column in origin table; whether this ordinal
+ * is flattened or unflattened depends on whether UDT flattening has already
+ * been performed on the relational expression which produced this
+ * description.
+ */
+ public int getOriginColumnOrdinal() {
+ return iOriginColumn;
+ }
+
+ /**
+ * Consider the query select a+b as c, d as e from t
. The
+ * output column c has two origins (a and b), both of them derived. The
+ * output column d as one origin (c), which is not derived.
+ *
+ * @return false if value taken directly from column in origin table; true
+ * otherwise
+ */
+ public boolean isDerived() {
+ return isDerived;
+ }
+
+ public String getTransform() {
+ return transform;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RelColumnOrigin)) {
+ return false;
+ }
+ RelColumnOrigin other = (RelColumnOrigin) obj;
+ return originTable.getQualifiedName().equals(other.originTable.getQualifiedName())
+ && (iOriginColumn == other.iOriginColumn)
+ && (isDerived == other.isDerived);
+ }
+
+ @Override
+ public int hashCode() {
+ return originTable.getQualifiedName().hashCode() + iOriginColumn + (isDerived ? 313 : 0);
+ }
+}
diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java b/dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
index 6bdad4d186..5c8aae002a 100644
--- a/dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
+++ b/dinky-client/dinky-client-1.15/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
@@ -36,7 +36,7 @@
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldAccess;
@@ -48,33 +48,47 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.BuiltInMethod;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Modified based on calcite's source code org.apache.calcite.rel.metadata.RelMdColumnOrigins
*
- * Modification point: 1. Support lookup join, add method getColumnOrigins(Snapshot
- * rel,RelMetadataQuery mq, int iOutputColumn) 2. Support watermark, add method
- * getColumnOrigins(SingleRel rel,RelMetadataQuery mq, int iOutputColumn) 3. Support table function,
- * add method getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) 4. Support
- * field AS LOCALTIMESTAMP, modify method getColumnOrigins(Calc rel, RelMetadataQuery mq, int
- * iOutputColumn) 5. Support CEP, add method getColumnOrigins(Match rel, RelMetadataQuery mq, int
- * iOutputColumn) 6. Support ROW_NUMBER(), add method getColumnOrigins(Window rel, RelMetadataQuery
- * mq, int iOutputColumn)*
+ *
Modification point:
+ *
+ * - Support lookup join, add method getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support watermark, add method getColumnOrigins(SingleRel rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support table function, add method getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support CEP, add method getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support transform, add method createDerivedColumnOrigins(Set inputSet, String transform, boolean originTransform), and related code
+ *
- Support field AS LOCALTIMESTAMP, modify method getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support PROCTIME() is the first filed, add method computeIndexWithOffset, used by getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
*
- * @description: RelMdColumnOrigins supplies a default implementation of {@link
- * RelMetadataQuery#getColumnOrigins} for the standard logical algebra.
- * @version: 1.0.0
+ * @description: RelMdColumnOrigins supplies a default implementation of {@link RelMetadataQuery#getColumnOrigins} for the standard logical algebra.
+ * @author: HamaWhite
*/
public class RelMdColumnOrigins implements MetadataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(RelMdColumnOrigins.class);
+
+ public static final String DELIMITER = ".";
+
public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.COLUMN_ORIGIN.method, new RelMdColumnOrigins());
@@ -98,10 +112,10 @@ public Set getColumnOrigins(Aggregate rel, RelMetadataQuery mq,
// Aggregate columns are derived from input columns
AggregateCall call = rel.getAggCallList().get(iOutputColumn - rel.getGroupCount());
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (Integer iInput : call.getArgList()) {
Set inputSet = mq.getColumnOrigins(rel.getInput(), iInput);
- inputSet = createDerivedColumnOrigins(inputSet);
+ inputSet = createDerivedColumnOrigins(inputSet, call.toString(), true);
if (inputSet != null) {
set.addAll(inputSet);
}
@@ -132,7 +146,9 @@ public Set getColumnOrigins(Join rel, RelMetadataQuery mq, int
return set;
}
- /** Support the field blood relationship of table function */
+ /**
+ * Support the field blood relationship of table function
+ */
public Set getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) {
List leftFieldList = rel.getLeft().getRowType().getFieldList();
@@ -142,68 +158,126 @@ public Set getColumnOrigins(Correlate rel, RelMetadataQuery mq,
if (iOutputColumn < nLeftColumns) {
set = mq.getColumnOrigins(rel.getLeft(), iOutputColumn);
} else {
- // get the field name of the left table configured in the Table Function on the right
- TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
- RexCall rexCall = (RexCall) tableFunctionScan.getCall();
- // support only one field in table function
- RexFieldAccess rexFieldAccess = (RexFieldAccess) rexCall.operands.get(0);
- String fieldName = rexFieldAccess.getField().getName();
-
- int leftFieldIndex = 0;
- for (int i = 0; i < nLeftColumns; i++) {
- if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
- leftFieldIndex = i;
- break;
+ if (rel.getRight() instanceof TableFunctionScan) {
+ // get the field name of the left table configured in the Table Function on the right
+ TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
+ RexCall rexCall = (RexCall) tableFunctionScan.getCall();
+ // support only one field in table function
+ RexFieldAccess rexFieldAccess =
+ (RexFieldAccess) rexCall.getOperands().get(0);
+ String fieldName = rexFieldAccess.getField().getName();
+
+ int leftFieldIndex = 0;
+ for (int i = 0; i < nLeftColumns; i++) {
+ if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
+ leftFieldIndex = i;
+ break;
+ }
}
+ /**
+ * Get the fields from the left table, don't go to
+ * getColumnOrigins(TableFunctionScan rel,RelMetadataQuery mq, int iOutputColumn),
+ * otherwise the return is null, and the UDTF field origin cannot be parsed
+ */
+ set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
+
+ // process transform for udtf
+ String transform = rexCall.toString().replace(rexFieldAccess.toString(), fieldName)
+ + DELIMITER
+ + tableFunctionScan.getRowType().getFieldNames().get(iOutputColumn - nLeftColumns);
+ set = createDerivedColumnOrigins(set, transform, false);
+ } else {
+ set = mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns);
}
- /**
- * Get the fields from the left table, don't go to getColumnOrigins(TableFunctionScan
- * rel,RelMetadataQuery mq, int iOutputColumn), otherwise the return is null, and the
- * UDTF field origin cannot be parsed
- */
- set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
}
return set;
}
public Set getColumnOrigins(SetOp rel, RelMetadataQuery mq, int iOutputColumn) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (RelNode input : rel.getInputs()) {
Set inputSet = mq.getColumnOrigins(input, iOutputColumn);
if (inputSet == null) {
- return null;
+ return Collections.emptySet();
}
set.addAll(inputSet);
}
return set;
}
- /** Support the field blood relationship of lookup join */
+ /**
+ * Support the field blood relationship of lookup join
+ */
public Set getColumnOrigins(Snapshot rel, RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
- /** Support the field blood relationship of watermark */
+ /**
+ * Support the field blood relationship of watermark
+ */
public Set getColumnOrigins(SingleRel rel, RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
/**
- * Support field blood relationship of CEP. The first column is the field after PARTITION BY,
- * and the other columns come from the measures in Match
+ * Support for new fields in the source table similar to those created with the LOCALTIMESTAMP function
+ */
+ public Set getColumnOrigins(Project rel, final RelMetadataQuery mq, int iOutputColumn) {
+ final RelNode input = rel.getInput();
+ RexNode rexNode = rel.getProjects().get(iOutputColumn);
+
+ if (rexNode instanceof RexInputRef) {
+ // Direct reference: no derivation added.
+ RexInputRef inputRef = (RexInputRef) rexNode;
+ int index = inputRef.getIndex();
+ if (input instanceof TableScan) {
+ index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
+ }
+ return mq.getColumnOrigins(input, index);
+ } else if (input instanceof TableScan
+ && rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty()) {
+ return mq.getColumnOrigins(input, iOutputColumn);
+ }
+ // Anything else is a derivation, possibly from multiple columns.
+ final Set set = getMultipleColumns(rexNode, input, mq);
+ return createDerivedColumnOrigins(set, rexNode.toString(), true);
+ }
+
+ private int computeIndexWithOffset(List projects, int baseIndex, int iOutputColumn) {
+ int offset = 0;
+ for (int index = 0; index < iOutputColumn; index++) {
+ RexNode rexNode = projects.get(index);
+ if ((rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty())) {
+ offset += 1;
+ }
+ }
+ return baseIndex + offset;
+ }
+
+ /**
+ * Support field blood relationship of CEP.
+ * The first column is the field after PARTITION BY, and the other columns come from the measures in Match
*/
public Set getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn) {
- if (iOutputColumn == 0) {
+ int orderCount = rel.getOrderKeys().getKeys().size();
+
+ if (iOutputColumn < orderCount) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
final RelNode input = rel.getInput();
- RexNode rexNode = rel.getMeasures().values().asList().get(iOutputColumn - 1);
+ RexNode rexNode = rel.getMeasures().values().asList().get(iOutputColumn - orderCount);
RexPatternFieldRef rexPatternFieldRef = searchRexPatternFieldRef(rexNode);
if (rexPatternFieldRef != null) {
- return mq.getColumnOrigins(input, rexPatternFieldRef.getIndex());
+ final Set set = mq.getColumnOrigins(input, rexPatternFieldRef.getIndex());
+ String originTransform = rexNode instanceof RexCall
+ ? ((RexCall) rexNode).getOperands().get(0).toString()
+ : null;
+ return createDerivedColumnOrigins(set, originTransform, true);
}
- return null;
+ return Collections.emptySet();
}
private RexPatternFieldRef searchRexPatternFieldRef(RexNode rexNode) {
@@ -219,46 +293,6 @@ private RexPatternFieldRef searchRexPatternFieldRef(RexNode rexNode) {
return null;
}
- /** Support the field blood relationship of ROW_NUMBER() */
- public Set getColumnOrigins(Window rel, RelMetadataQuery mq, int iOutputColumn) {
- final RelNode input = rel.getInput();
- /**
- * Haven't found a good way to judge whether the field comes from window, for the time
- * being, first judge by parsing the string
- */
- String fieldName = rel.getRowType().getFieldNames().get(iOutputColumn);
- // for example: "w1$o0"
- if (fieldName.startsWith("w") && fieldName.contains("$")) {
- int groupIndex = Integer.parseInt(fieldName.substring(1, fieldName.indexOf("$")));
- final Set set = new LinkedHashSet<>();
- if (!rel.groups.isEmpty()) {
- Window.Group group = rel.groups.get(groupIndex);
- // process partition by keys
- group.keys.asList().forEach(index -> set.addAll(mq.getColumnOrigins(input, index)));
- // process order by keys
- group.orderKeys
- .getFieldCollations()
- .forEach(e -> set.addAll(mq.getColumnOrigins(input, e.getFieldIndex())));
- }
- return set;
- }
- return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
- }
-
- public Set getColumnOrigins(Project rel, final RelMetadataQuery mq, int iOutputColumn) {
- final RelNode input = rel.getInput();
- RexNode rexNode = rel.getProjects().get(iOutputColumn);
-
- if (rexNode instanceof RexInputRef) {
- // Direct reference: no derivation added.
- RexInputRef inputRef = (RexInputRef) rexNode;
- return mq.getColumnOrigins(input, inputRef.getIndex());
- }
- // Anything else is a derivation, possibly from multiple columns.
- final Set set = getMultipleColumns(rexNode, input, mq);
- return createDerivedColumnOrigins(set);
- }
-
public Set getColumnOrigins(Calc rel, final RelMetadataQuery mq, int iOutputColumn) {
final RelNode input = rel.getInput();
final RexShuttle rexShuttle = new RexShuttle() {
@@ -277,30 +311,6 @@ public RexNode visitLocalRef(RexLocalRef localRef) {
// Direct reference: no derivation added.
RexInputRef inputRef = (RexInputRef) rexNode;
return mq.getColumnOrigins(input, inputRef.getIndex());
- } else if (rexNode instanceof RexCall && ((RexCall) rexNode).operands.isEmpty()) {
- // support for new fields in the source table similar to those created with the
- // LOCALTIMESTAMP function
- TableSourceTable table = ((TableSourceTable) rel.getInput().getTable());
- if (table != null) {
- String targetFieldName = rel.getProgram()
- .getOutputRowType()
- .getFieldList()
- .get(iOutputColumn)
- .getName();
- List fieldList =
- table.contextResolvedTable().getResolvedSchema().getColumnNames();
-
- int index = -1;
- for (int i = 0; i < fieldList.size(); i++) {
- if (fieldList.get(i).equalsIgnoreCase(targetFieldName)) {
- index = i;
- break;
- }
- }
- if (index != -1) {
- return Collections.singleton(new RelColumnOrigin(table, index, false));
- }
- }
}
// Anything else is a derivation, possibly from multiple columns.
final Set set = getMultipleColumns(rexNode, input, mq);
@@ -324,14 +334,14 @@ public Set getColumnOrigins(Exchange rel, RelMetadataQuery mq,
}
public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQuery mq, int iOutputColumn) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
Set mappings = rel.getColumnMappings();
if (mappings == null) {
- if (rel.getInputs().size() > 0) {
+ if (!rel.getInputs().isEmpty()) {
// This is a non-leaf transformation: say we don't
// know about origins, because there are probably
// columns below.
- return null;
+ return Collections.emptySet();
} else {
// This is a leaf transformation: say there are fer sure no
// column origins.
@@ -346,7 +356,7 @@ public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQ
final int column = mapping.iInputColumn;
Set origins = mq.getColumnOrigins(input, column);
if (origins == null) {
- return null;
+ return Collections.emptySet();
}
if (mapping.derived) {
origins = createDerivedColumnOrigins(origins);
@@ -357,18 +367,19 @@ public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQ
}
// Catch-all rule when none of the others apply.
+ @SuppressWarnings("squid:S1172")
public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, int iOutputColumn) {
// NOTE jvs 28-Mar-2006: We may get this wrong for a physical table
// expression which supports projections. In that case,
// it's up to the plugin writer to override with the
// correct information.
- if (rel.getInputs().size() > 0) {
+ if (!rel.getInputs().isEmpty()) {
// No generic logic available for non-leaf rels.
- return null;
+ return Collections.emptySet();
}
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
RelOptTable table = rel.getTable();
if (table == null) {
@@ -383,7 +394,7 @@ public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, i
// names.) This detection assumes the table expression doesn't handle
// rename as well.
if (table.getRowType() != rel.getRowType()) {
- return null;
+ return Collections.emptySet();
}
set.add(new RelColumnOrigin(table, iOutputColumn, false));
@@ -392,9 +403,9 @@ public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, i
private Set createDerivedColumnOrigins(Set inputSet) {
if (inputSet == null) {
- return null;
+ return Collections.emptySet();
}
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (RelColumnOrigin rco : inputSet) {
RelColumnOrigin derived = new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true);
set.add(derived);
@@ -402,10 +413,113 @@ private Set createDerivedColumnOrigins(Set inp
return set;
}
+ private Set createDerivedColumnOrigins(
+ Set inputSet, String transform, boolean originTransform) {
+ if (inputSet == null || inputSet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ final Set set = new LinkedHashSet<>();
+
+ String finalTransform = originTransform ? computeTransform(inputSet, transform) : transform;
+ for (RelColumnOrigin rco : inputSet) {
+ RelColumnOrigin derived =
+ new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true, finalTransform);
+ set.add(derived);
+ }
+ return set;
+ }
+
+ /**
+ * Replace the variable at the beginning of $ in input with the real field information
+ */
+ private String computeTransform(Set inputSet, String transform) {
+ LOG.debug("origin transform: {}", transform);
+ Pattern pattern = Pattern.compile("\\$\\d+");
+ Matcher matcher = pattern.matcher(transform);
+
+ Set operandSet = new LinkedHashSet<>();
+ while (matcher.find()) {
+ operandSet.add(matcher.group());
+ }
+
+ if (operandSet.isEmpty()) {
+ LOG.info("operandSet is empty");
+ return null;
+ }
+ if (inputSet.size() != operandSet.size()) {
+ LOG.warn(
+ "The number [{}] of fields in the source tables are not equal to operands [{}]",
+ inputSet.size(),
+ operandSet.size());
+ return null;
+ }
+
+ Map sourceColumnMap = new HashMap<>();
+ Iterator iterator = optimizeSourceColumnSet(inputSet).iterator();
+ operandSet.forEach(e -> sourceColumnMap.put(e, iterator.next()));
+ LOG.debug("sourceColumnMap: {}", sourceColumnMap);
+
+ matcher = pattern.matcher(transform);
+ String temp;
+ while (matcher.find()) {
+ temp = matcher.group();
+ transform = transform.replace(temp, sourceColumnMap.get(temp));
+ }
+
+ // temporary special treatment
+ transform = transform.replace("_UTF-16LE", "");
+ LOG.debug("transform: {}", transform);
+ return transform;
+ }
+
+ /**
+ * Increase the readability of transform.
+ * if catalog, database and table are the same, return field.
+ * If the catalog and database are the same, return the table and field.
+ * If the catalog is the same, return the database, table, field.
+ * Otherwise, return all
+ */
+ private Set optimizeSourceColumnSet(Set inputSet) {
+ Set catalogSet = new HashSet<>();
+ Set databaseSet = new HashSet<>();
+ Set tableSet = new HashSet<>();
+ Set> qualifiedSet = new LinkedHashSet<>();
+ for (RelColumnOrigin rco : inputSet) {
+ RelOptTable originTable = rco.getOriginTable();
+ List qualifiedName = originTable.getQualifiedName();
+
+ // catalog,database,table,field
+ List qualifiedList = new ArrayList<>(qualifiedName);
+ catalogSet.add(qualifiedName.get(0));
+ databaseSet.add(qualifiedName.get(1));
+ tableSet.add(qualifiedName.get(2));
+
+ String field = rco.getTransform() != null
+ ? rco.getTransform()
+ : originTable.getRowType().getFieldNames().get(rco.getOriginColumnOrdinal());
+ qualifiedList.add(field);
+ qualifiedSet.add(qualifiedList);
+ }
+ if (catalogSet.size() == 1 && databaseSet.size() == 1 && tableSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> e.get(3));
+ } else if (catalogSet.size() == 1 && databaseSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(2, 4)));
+ } else if (catalogSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(1, 4)));
+ } else {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e));
+ }
+ }
+
+ private Set optimizeName(Set> qualifiedSet, Function, String> mapper) {
+ return qualifiedSet.stream().map(mapper).collect(Collectors.toSet());
+ }
+
private Set getMultipleColumns(RexNode rexNode, RelNode input, final RelMetadataQuery mq) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
final RexVisitor visitor = new RexVisitorImpl(true) {
+ @Override
public Void visitInputRef(RexInputRef inputRef) {
Set inputSet = mq.getColumnOrigins(input, inputRef.getIndex());
if (inputSet != null) {
diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
index 23eb46eadb..a3320dd770 100644
--- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
+++ b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
@@ -24,7 +24,6 @@
import org.dinky.assertion.Asserts;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
-import org.dinky.utils.FlinkStreamProgramWithoutPhysical;
import org.dinky.utils.LineageContext;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -64,7 +63,6 @@
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.CreateTableASOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.types.Row;
import java.util.ArrayList;
@@ -85,8 +83,6 @@
*/
public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {
- private final FlinkChainedProgram flinkChainedProgram;
-
public CustomTableEnvironmentImpl(
CatalogManager catalogManager,
ModuleManager moduleManager,
@@ -107,8 +103,6 @@ public CustomTableEnvironmentImpl(
executor,
isStreamingMode,
userClassLoader));
- this.flinkChainedProgram = FlinkStreamProgramWithoutPhysical.buildProgram(
- (Configuration) getStreamExecutionEnvironment().getConfiguration());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
@@ -324,8 +318,7 @@ public void createTemporaryView(String path, DataStream dataStream, Strin
@Override
public List getLineage(String statement) {
- LineageContext lineageContext =
- new LineageContext(flinkChainedProgram, (TableEnvironmentImpl) streamTableEnvironment);
+ LineageContext lineageContext = new LineageContext((TableEnvironmentImpl) streamTableEnvironment);
return lineageContext.getLineage(statement);
}
diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java
deleted file mode 100644
index bf4808939d..0000000000
--- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.dinky.utils;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.hep.HepMatchOrder;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
-import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets;
-
-/**
- * FlinkStreamProgramWithoutPhysical
- *
- * @since 2022/11/22
- */
-public class FlinkStreamProgramWithoutPhysical {
-
- private static final String SUBQUERY_REWRITE = "subquery_rewrite";
- private static final String TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite";
- private static final String DECORRELATE = "decorrelate";
- private static final String DEFAULT_REWRITE = "default_rewrite";
- private static final String PREDICATE_PUSHDOWN = "predicate_pushdown";
- private static final String JOIN_REORDER = "join_reorder";
- private static final String PROJECT_REWRITE = "project_rewrite";
- private static final String LOGICAL = "logical";
- private static final String LOGICAL_REWRITE = "logical_rewrite";
-
- public static FlinkChainedProgram buildProgram(Configuration config) {
- FlinkChainedProgram chainedProgram = new FlinkChainedProgram();
-
- // rewrite sub-queries to joins
- chainedProgram.addLast(
- SUBQUERY_REWRITE,
- FlinkGroupProgramBuilder.newBuilder()
- // rewrite QueryOperationCatalogViewTable before rewriting sub-queries
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_REF_RULES())
- .build(),
- "convert table references before rewriting sub-queries to" + " semi-join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.SEMI_JOIN_RULES())
- .build(),
- "rewrite sub-queries to semi-join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES())
- .build(),
- "sub-queries remove")
- // convert RelOptTableImpl (which exists in SubQuery before) to
- // FlinkRelOptTable
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_REF_RULES())
- .build(),
- "convert table references after sub-queries removed")
- .build());
-
- // rewrite special temporal join plan
- chainedProgram.addLast(
- TEMPORAL_JOIN_REWRITE,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.EXPAND_PLAN_RULES())
- .build(),
- "convert correlate to temporal table join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.POST_EXPAND_CLEAN_UP_RULES())
- .build(),
- "convert enumerable table scan")
- .build());
-
- // query decorrelation
- chainedProgram.addLast(
- DECORRELATE,
- FlinkGroupProgramBuilder.newBuilder()
- // rewrite before decorrelation
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PRE_DECORRELATION_RULES())
- .build(),
- "pre-rewrite before decorrelation")
- .addProgram(new FlinkDecorrelateProgram(), "")
- .build());
-
- // default rewrite, includes: predicate simplification, expression reduction, window
- // properties rewrite, etc.
- chainedProgram.addLast(
- DEFAULT_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.DEFAULT_REWRITE_RULES())
- .build());
-
- // rule based optimization: push down predicate(s) in where clause, so it only needs to read
- // the required data
- chainedProgram.addLast(
- PREDICATE_PUSHDOWN,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_PREDICATE_REWRITE_RULES())
- .build(),
- "join predicate rewrite")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES())
- .build(),
- "filter rules")
- .setIterations(5)
- .build(),
- "predicate rewrite")
- .addProgram(
- // PUSH_PARTITION_DOWN_RULES should always be in front of
- // PUSH_FILTER_DOWN_RULES
- // to prevent PUSH_FILTER_DOWN_RULES from consuming the predicates
- // in partitions
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PUSH_PARTITION_DOWN_RULES())
- .build(),
- "push down partitions into table scan")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PUSH_FILTER_DOWN_RULES())
- .build(),
- "push down filters into table scan")
- .build(),
- "push predicate into table scan")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PRUNE_EMPTY_RULES())
- .build(),
- "prune empty after predicate push down")
- .build());
-
- // join reorder
- if (config.getBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
- chainedProgram.addLast(
- JOIN_REORDER,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_REORDER_PREPARE_RULES())
- .build(),
- "merge join into MultiJoin")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_REORDER_RULES())
- .build(),
- "do join reorder")
- .build());
- }
-
- // project rewrite
- chainedProgram.addLast(
- PROJECT_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PROJECT_RULES())
- .build());
-
- // optimize the logical plan
- chainedProgram.addLast(
- LOGICAL,
- FlinkVolcanoProgramBuilder.newBuilder()
- .add(FlinkStreamRuleSets.LOGICAL_OPT_RULES())
- .setRequiredOutputTraits(new Convention.Impl[] {FlinkConventions.LOGICAL()})
- .build());
-
- // logical rewrite
- chainedProgram.addLast(
- LOGICAL_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.LOGICAL_REWRITE())
- .build());
-
- return chainedProgram;
- }
-}
diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/LineageContext.java b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/LineageContext.java
index 407ca3f78e..d707ade42a 100644
--- a/dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/LineageContext.java
+++ b/dinky-client/dinky-client-1.15/src/main/java/org/dinky/utils/LineageContext.java
@@ -27,23 +27,13 @@
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.SinkModifyOperation;
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
-import org.apache.flink.table.planner.calcite.SqlExprToRexConverterFactory;
-import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
-import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.trait.MiniBatchInterval;
import java.util.ArrayList;
import java.util.List;
@@ -56,11 +46,9 @@
*/
public class LineageContext {
- private final FlinkChainedProgram flinkChainedProgram;
private final TableEnvironmentImpl tableEnv;
- public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentImpl tableEnv) {
- this.flinkChainedProgram = flinkChainedProgram;
+ public LineageContext(TableEnvironmentImpl tableEnv) {
this.tableEnv = tableEnv;
}
@@ -70,11 +58,8 @@ public List getLineage(String statement) {
String sinkTable = parsed.getField(0);
RelNode oriRelNode = parsed.getField(1);
- // 2. Optimize original relNode to generate Optimized Logical Plan
- RelNode optRelNode = optimize(oriRelNode);
-
- // 3. Build lineage based from RelMetadataQuery
- return buildFiledLineageResult(sinkTable, optRelNode);
+ // 2. Build lineage based from RelMetadataQuery
+ return buildFiledLineageResult(sinkTable, oriRelNode);
}
private Tuple2 parseStatement(String sql) {
@@ -96,76 +81,6 @@ private Tuple2 parseStatement(String sql) {
}
}
- /** Calling each program's optimize method in sequence. */
- private RelNode optimize(RelNode relNode) {
- return flinkChainedProgram.optimize(relNode, new StreamOptimizeContext() {
-
- @Override
- public boolean isBatchMode() {
- return false;
- }
-
- @Override
- public TableConfig getTableConfig() {
- return tableEnv.getConfig();
- }
-
- @Override
- public FunctionCatalog getFunctionCatalog() {
- return getPlanner().getFlinkContext().getFunctionCatalog();
- }
-
- @Override
- public CatalogManager getCatalogManager() {
- return tableEnv.getCatalogManager();
- }
-
- @Override
- public ModuleManager getModuleManager() {
- return getPlanner().getFlinkContext().getModuleManager();
- }
-
- @Override
- public SqlExprToRexConverterFactory getSqlExprToRexConverterFactory() {
- return null;
- }
-
- @Override
- public C unwrap(Class clazz) {
- return getPlanner().getFlinkContext().unwrap(clazz);
- }
-
- @Override
- public FlinkRelBuilder getFlinkRelBuilder() {
- return getPlanner().getRelBuilder();
- }
-
- @Override
- public boolean isUpdateBeforeRequired() {
- return false;
- }
-
- @Override
- public MiniBatchInterval getMiniBatchInterval() {
- return MiniBatchInterval.NONE;
- }
-
- @Override
- public boolean needFinalTimeIndicatorConversion() {
- return true;
- }
-
- @Override
- public ClassLoader getClassLoader() {
- return getPlanner().getFlinkContext().getClassLoader();
- }
-
- private PlannerBase getPlanner() {
- return (PlannerBase) tableEnv.getPlanner();
- }
- });
- }
-
/** Check the size of query and sink fields match */
private void validateSchema(String sinkTable, RelNode relNode, List sinkFieldList) {
List queryFieldList = relNode.getRowType().getFieldNames();
@@ -209,7 +124,8 @@ private List buildFiledLineageResult(String sinkTable, RelNode optRe
String sourceColumn = fieldNames.get(ordinal);
// add record
- resultList.add(LineageRel.build(sourceTable, sourceColumn, sinkTable, targetColumn));
+ resultList.add(LineageRel.build(
+ sourceTable, sourceColumn, sinkTable, targetColumn, relColumnOrigin.getTransform()));
}
}
}
diff --git a/dinky-client/dinky-client-1.15/src/test/java/org/dinky/utils/LineageContextTest.java b/dinky-client/dinky-client-1.15/src/test/java/org/dinky/utils/LineageContextTest.java
new file mode 100644
index 0000000000..fa22d6eec6
--- /dev/null
+++ b/dinky-client/dinky-client-1.15/src/test/java/org/dinky/utils/LineageContextTest.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.dinky.data.model.LineageRel;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * @description: LineageContextTest
+ * @author: HamaWhite
+ */
+public class LineageContextTest {
+
+ private static TableEnvironmentImpl tableEnv;
+ private static LineageContext context;
+
+ @BeforeClass
+ public static void setUp() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(new Configuration());
+
+ EnvironmentSettings settings =
+ EnvironmentSettings.newInstance().inStreamingMode().build();
+ tableEnv = (TableEnvironmentImpl) StreamTableEnvironment.create(env, settings);
+
+ context = new LineageContext(tableEnv);
+ }
+
+ @Before
+ public void init() {
+ // create table ST
+ tableEnv.executeSql("DROP TABLE IF EXISTS ST");
+ tableEnv.executeSql("CREATE TABLE ST ( " + " a STRING ,"
+ + " b STRING ,"
+ + " c STRING "
+ + ") WITH ( "
+ + " 'connector' = 'datagen' ,"
+ + " 'rows-per-second' = '1' "
+ + ")");
+
+ // create table TT
+ tableEnv.executeSql("DROP TABLE IF EXISTS TT");
+ tableEnv.executeSql("CREATE TABLE TT ( " + " A STRING ,"
+ + " B STRING "
+ + ") WITH ( "
+ + " 'connector' = 'print' "
+ + ")");
+ }
+
+ @Test
+ public void testGetLineage() {
+ List actualList = context.getLineage("INSERT INTO TT select a||c A ,b||c B from ST");
+ String[][] expectedArray = {
+ {"ST", "a", "TT", "A", "||(a, c)"},
+ {"ST", "c", "TT", "A", "||(a, c)"},
+ {"ST", "b", "TT", "B", "||(b, c)"},
+ {"ST", "c", "TT", "B", "||(b, c)"}
+ };
+
+ List expectedList = buildResult(expectedArray);
+ assertEquals(expectedList, actualList);
+ }
+
+ private List buildResult(String[][] expectedArray) {
+ return Stream.of(expectedArray)
+ .map(e -> {
+ String transform = e.length == 5 ? e[4] : null;
+ return new LineageRel(
+ "default_catalog",
+ "default_database",
+ e[0],
+ e[1],
+ "default_catalog",
+ "default_database",
+ e[2],
+ e[3],
+ transform);
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git a/dinky-client/dinky-client-1.16/pom.xml b/dinky-client/dinky-client-1.16/pom.xml
index 770b7351cb..649b33bdec 100644
--- a/dinky-client/dinky-client-1.16/pom.xml
+++ b/dinky-client/dinky-client-1.16/pom.xml
@@ -41,6 +41,11 @@
com.sun.xml.bind
jaxb-core
+
+ junit
+ junit
+ provided
+
diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
new file mode 100644
index 0000000000..5fc8dc24cb
--- /dev/null
+++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.calcite.rel.metadata;
+
+import org.apache.calcite.plan.RelOptTable;
+
+/**
+ * Modified based on calcite's source code org.apache.calcite.rel.metadata.RelColumnOrigin
+ *
+ * Modification point:
+ *
+ * - add transform field and related code.
+ *
+ *
+ * @description: RelColumnOrigin is a data structure describing one of the origins of an
+ * output column produced by a relational expression.
+ * @author: HamaWhite
+ */
+public class RelColumnOrigin {
+ // ~ Instance fields --------------------------------------------------------
+
+ private final RelOptTable originTable;
+
+ private final int iOriginColumn;
+
+ private final boolean isDerived;
+
+ /**
+ * Stores the expression for data conversion,
+ * which source table fields are transformed by which expression the target field
+ */
+ private String transform;
+
+ // ~ Constructors -----------------------------------------------------------
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ }
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, String transform) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ this.transform = transform;
+ }
+
+ // ~ Methods ----------------------------------------------------------------
+
+ /**
+ * Returns table of origin.
+ */
+ public RelOptTable getOriginTable() {
+ return originTable;
+ }
+
+ /**
+ * Returns the 0-based index of column in origin table; whether this ordinal
+ * is flattened or unflattened depends on whether UDT flattening has already
+ * been performed on the relational expression which produced this
+ * description.
+ */
+ public int getOriginColumnOrdinal() {
+ return iOriginColumn;
+ }
+
+ /**
+ * Consider the query select a+b as c, d as e from t
. The
+ * output column c has two origins (a and b), both of them derived. The
+ * output column d as one origin (c), which is not derived.
+ *
+ * @return false if value taken directly from column in origin table; true
+ * otherwise
+ */
+ public boolean isDerived() {
+ return isDerived;
+ }
+
+ public String getTransform() {
+ return transform;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RelColumnOrigin)) {
+ return false;
+ }
+ RelColumnOrigin other = (RelColumnOrigin) obj;
+ return originTable.getQualifiedName().equals(other.originTable.getQualifiedName())
+ && (iOriginColumn == other.iOriginColumn)
+ && (isDerived == other.isDerived);
+ }
+
+ @Override
+ public int hashCode() {
+ return originTable.getQualifiedName().hashCode() + iOriginColumn + (isDerived ? 313 : 0);
+ }
+}
diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
index 6bdad4d186..5c8aae002a 100644
--- a/dinky-client/dinky-client-1.16/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
+++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
@@ -36,7 +36,7 @@
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.TableFunctionScan;
import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.core.Window;
+import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexFieldAccess;
@@ -48,33 +48,47 @@
import org.apache.calcite.rex.RexVisitor;
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.util.BuiltInMethod;
-import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Modified based on calcite's source code org.apache.calcite.rel.metadata.RelMdColumnOrigins
*
- * Modification point: 1. Support lookup join, add method getColumnOrigins(Snapshot
- * rel,RelMetadataQuery mq, int iOutputColumn) 2. Support watermark, add method
- * getColumnOrigins(SingleRel rel,RelMetadataQuery mq, int iOutputColumn) 3. Support table function,
- * add method getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) 4. Support
- * field AS LOCALTIMESTAMP, modify method getColumnOrigins(Calc rel, RelMetadataQuery mq, int
- * iOutputColumn) 5. Support CEP, add method getColumnOrigins(Match rel, RelMetadataQuery mq, int
- * iOutputColumn) 6. Support ROW_NUMBER(), add method getColumnOrigins(Window rel, RelMetadataQuery
- * mq, int iOutputColumn)*
+ *
Modification point:
+ *
+ * - Support lookup join, add method getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support watermark, add method getColumnOrigins(SingleRel rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support table function, add method getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support CEP, add method getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support transform, add method createDerivedColumnOrigins(Set inputSet, String transform, boolean originTransform), and related code
+ *
- Support field AS LOCALTIMESTAMP, modify method getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support PROCTIME() is the first filed, add method computeIndexWithOffset, used by getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
*
- * @description: RelMdColumnOrigins supplies a default implementation of {@link
- * RelMetadataQuery#getColumnOrigins} for the standard logical algebra.
- * @version: 1.0.0
+ * @description: RelMdColumnOrigins supplies a default implementation of {@link RelMetadataQuery#getColumnOrigins} for the standard logical algebra.
+ * @author: HamaWhite
*/
public class RelMdColumnOrigins implements MetadataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(RelMdColumnOrigins.class);
+
+ public static final String DELIMITER = ".";
+
public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
BuiltInMethod.COLUMN_ORIGIN.method, new RelMdColumnOrigins());
@@ -98,10 +112,10 @@ public Set getColumnOrigins(Aggregate rel, RelMetadataQuery mq,
// Aggregate columns are derived from input columns
AggregateCall call = rel.getAggCallList().get(iOutputColumn - rel.getGroupCount());
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (Integer iInput : call.getArgList()) {
Set inputSet = mq.getColumnOrigins(rel.getInput(), iInput);
- inputSet = createDerivedColumnOrigins(inputSet);
+ inputSet = createDerivedColumnOrigins(inputSet, call.toString(), true);
if (inputSet != null) {
set.addAll(inputSet);
}
@@ -132,7 +146,9 @@ public Set getColumnOrigins(Join rel, RelMetadataQuery mq, int
return set;
}
- /** Support the field blood relationship of table function */
+ /**
+ * Support the field blood relationship of table function
+ */
public Set getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) {
List leftFieldList = rel.getLeft().getRowType().getFieldList();
@@ -142,68 +158,126 @@ public Set getColumnOrigins(Correlate rel, RelMetadataQuery mq,
if (iOutputColumn < nLeftColumns) {
set = mq.getColumnOrigins(rel.getLeft(), iOutputColumn);
} else {
- // get the field name of the left table configured in the Table Function on the right
- TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
- RexCall rexCall = (RexCall) tableFunctionScan.getCall();
- // support only one field in table function
- RexFieldAccess rexFieldAccess = (RexFieldAccess) rexCall.operands.get(0);
- String fieldName = rexFieldAccess.getField().getName();
-
- int leftFieldIndex = 0;
- for (int i = 0; i < nLeftColumns; i++) {
- if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
- leftFieldIndex = i;
- break;
+ if (rel.getRight() instanceof TableFunctionScan) {
+ // get the field name of the left table configured in the Table Function on the right
+ TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
+ RexCall rexCall = (RexCall) tableFunctionScan.getCall();
+ // support only one field in table function
+ RexFieldAccess rexFieldAccess =
+ (RexFieldAccess) rexCall.getOperands().get(0);
+ String fieldName = rexFieldAccess.getField().getName();
+
+ int leftFieldIndex = 0;
+ for (int i = 0; i < nLeftColumns; i++) {
+ if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
+ leftFieldIndex = i;
+ break;
+ }
}
+ /**
+ * Get the fields from the left table, don't go to
+ * getColumnOrigins(TableFunctionScan rel,RelMetadataQuery mq, int iOutputColumn),
+ * otherwise the return is null, and the UDTF field origin cannot be parsed
+ */
+ set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
+
+ // process transform for udtf
+ String transform = rexCall.toString().replace(rexFieldAccess.toString(), fieldName)
+ + DELIMITER
+ + tableFunctionScan.getRowType().getFieldNames().get(iOutputColumn - nLeftColumns);
+ set = createDerivedColumnOrigins(set, transform, false);
+ } else {
+ set = mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns);
}
- /**
- * Get the fields from the left table, don't go to getColumnOrigins(TableFunctionScan
- * rel,RelMetadataQuery mq, int iOutputColumn), otherwise the return is null, and the
- * UDTF field origin cannot be parsed
- */
- set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
}
return set;
}
public Set getColumnOrigins(SetOp rel, RelMetadataQuery mq, int iOutputColumn) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (RelNode input : rel.getInputs()) {
Set inputSet = mq.getColumnOrigins(input, iOutputColumn);
if (inputSet == null) {
- return null;
+ return Collections.emptySet();
}
set.addAll(inputSet);
}
return set;
}
- /** Support the field blood relationship of lookup join */
+ /**
+ * Support the field blood relationship of lookup join
+ */
public Set getColumnOrigins(Snapshot rel, RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
- /** Support the field blood relationship of watermark */
+ /**
+ * Support the field blood relationship of watermark
+ */
public Set getColumnOrigins(SingleRel rel, RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
/**
- * Support field blood relationship of CEP. The first column is the field after PARTITION BY,
- * and the other columns come from the measures in Match
+ * Support for new fields in the source table similar to those created with the LOCALTIMESTAMP function
+ */
+ public Set getColumnOrigins(Project rel, final RelMetadataQuery mq, int iOutputColumn) {
+ final RelNode input = rel.getInput();
+ RexNode rexNode = rel.getProjects().get(iOutputColumn);
+
+ if (rexNode instanceof RexInputRef) {
+ // Direct reference: no derivation added.
+ RexInputRef inputRef = (RexInputRef) rexNode;
+ int index = inputRef.getIndex();
+ if (input instanceof TableScan) {
+ index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
+ }
+ return mq.getColumnOrigins(input, index);
+ } else if (input instanceof TableScan
+ && rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty()) {
+ return mq.getColumnOrigins(input, iOutputColumn);
+ }
+ // Anything else is a derivation, possibly from multiple columns.
+ final Set set = getMultipleColumns(rexNode, input, mq);
+ return createDerivedColumnOrigins(set, rexNode.toString(), true);
+ }
+
+ private int computeIndexWithOffset(List projects, int baseIndex, int iOutputColumn) {
+ int offset = 0;
+ for (int index = 0; index < iOutputColumn; index++) {
+ RexNode rexNode = projects.get(index);
+ if ((rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty())) {
+ offset += 1;
+ }
+ }
+ return baseIndex + offset;
+ }
+
+ /**
+ * Support field blood relationship of CEP.
+ * The first column is the field after PARTITION BY, and the other columns come from the measures in Match
*/
public Set getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn) {
- if (iOutputColumn == 0) {
+ int orderCount = rel.getOrderKeys().getKeys().size();
+
+ if (iOutputColumn < orderCount) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
final RelNode input = rel.getInput();
- RexNode rexNode = rel.getMeasures().values().asList().get(iOutputColumn - 1);
+ RexNode rexNode = rel.getMeasures().values().asList().get(iOutputColumn - orderCount);
RexPatternFieldRef rexPatternFieldRef = searchRexPatternFieldRef(rexNode);
if (rexPatternFieldRef != null) {
- return mq.getColumnOrigins(input, rexPatternFieldRef.getIndex());
+ final Set set = mq.getColumnOrigins(input, rexPatternFieldRef.getIndex());
+ String originTransform = rexNode instanceof RexCall
+ ? ((RexCall) rexNode).getOperands().get(0).toString()
+ : null;
+ return createDerivedColumnOrigins(set, originTransform, true);
}
- return null;
+ return Collections.emptySet();
}
private RexPatternFieldRef searchRexPatternFieldRef(RexNode rexNode) {
@@ -219,46 +293,6 @@ private RexPatternFieldRef searchRexPatternFieldRef(RexNode rexNode) {
return null;
}
- /** Support the field blood relationship of ROW_NUMBER() */
- public Set getColumnOrigins(Window rel, RelMetadataQuery mq, int iOutputColumn) {
- final RelNode input = rel.getInput();
- /**
- * Haven't found a good way to judge whether the field comes from window, for the time
- * being, first judge by parsing the string
- */
- String fieldName = rel.getRowType().getFieldNames().get(iOutputColumn);
- // for example: "w1$o0"
- if (fieldName.startsWith("w") && fieldName.contains("$")) {
- int groupIndex = Integer.parseInt(fieldName.substring(1, fieldName.indexOf("$")));
- final Set set = new LinkedHashSet<>();
- if (!rel.groups.isEmpty()) {
- Window.Group group = rel.groups.get(groupIndex);
- // process partition by keys
- group.keys.asList().forEach(index -> set.addAll(mq.getColumnOrigins(input, index)));
- // process order by keys
- group.orderKeys
- .getFieldCollations()
- .forEach(e -> set.addAll(mq.getColumnOrigins(input, e.getFieldIndex())));
- }
- return set;
- }
- return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
- }
-
- public Set getColumnOrigins(Project rel, final RelMetadataQuery mq, int iOutputColumn) {
- final RelNode input = rel.getInput();
- RexNode rexNode = rel.getProjects().get(iOutputColumn);
-
- if (rexNode instanceof RexInputRef) {
- // Direct reference: no derivation added.
- RexInputRef inputRef = (RexInputRef) rexNode;
- return mq.getColumnOrigins(input, inputRef.getIndex());
- }
- // Anything else is a derivation, possibly from multiple columns.
- final Set set = getMultipleColumns(rexNode, input, mq);
- return createDerivedColumnOrigins(set);
- }
-
public Set getColumnOrigins(Calc rel, final RelMetadataQuery mq, int iOutputColumn) {
final RelNode input = rel.getInput();
final RexShuttle rexShuttle = new RexShuttle() {
@@ -277,30 +311,6 @@ public RexNode visitLocalRef(RexLocalRef localRef) {
// Direct reference: no derivation added.
RexInputRef inputRef = (RexInputRef) rexNode;
return mq.getColumnOrigins(input, inputRef.getIndex());
- } else if (rexNode instanceof RexCall && ((RexCall) rexNode).operands.isEmpty()) {
- // support for new fields in the source table similar to those created with the
- // LOCALTIMESTAMP function
- TableSourceTable table = ((TableSourceTable) rel.getInput().getTable());
- if (table != null) {
- String targetFieldName = rel.getProgram()
- .getOutputRowType()
- .getFieldList()
- .get(iOutputColumn)
- .getName();
- List fieldList =
- table.contextResolvedTable().getResolvedSchema().getColumnNames();
-
- int index = -1;
- for (int i = 0; i < fieldList.size(); i++) {
- if (fieldList.get(i).equalsIgnoreCase(targetFieldName)) {
- index = i;
- break;
- }
- }
- if (index != -1) {
- return Collections.singleton(new RelColumnOrigin(table, index, false));
- }
- }
}
// Anything else is a derivation, possibly from multiple columns.
final Set set = getMultipleColumns(rexNode, input, mq);
@@ -324,14 +334,14 @@ public Set getColumnOrigins(Exchange rel, RelMetadataQuery mq,
}
public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQuery mq, int iOutputColumn) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
Set mappings = rel.getColumnMappings();
if (mappings == null) {
- if (rel.getInputs().size() > 0) {
+ if (!rel.getInputs().isEmpty()) {
// This is a non-leaf transformation: say we don't
// know about origins, because there are probably
// columns below.
- return null;
+ return Collections.emptySet();
} else {
// This is a leaf transformation: say there are fer sure no
// column origins.
@@ -346,7 +356,7 @@ public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQ
final int column = mapping.iInputColumn;
Set origins = mq.getColumnOrigins(input, column);
if (origins == null) {
- return null;
+ return Collections.emptySet();
}
if (mapping.derived) {
origins = createDerivedColumnOrigins(origins);
@@ -357,18 +367,19 @@ public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQ
}
// Catch-all rule when none of the others apply.
+ @SuppressWarnings("squid:S1172")
public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, int iOutputColumn) {
// NOTE jvs 28-Mar-2006: We may get this wrong for a physical table
// expression which supports projections. In that case,
// it's up to the plugin writer to override with the
// correct information.
- if (rel.getInputs().size() > 0) {
+ if (!rel.getInputs().isEmpty()) {
// No generic logic available for non-leaf rels.
- return null;
+ return Collections.emptySet();
}
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
RelOptTable table = rel.getTable();
if (table == null) {
@@ -383,7 +394,7 @@ public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, i
// names.) This detection assumes the table expression doesn't handle
// rename as well.
if (table.getRowType() != rel.getRowType()) {
- return null;
+ return Collections.emptySet();
}
set.add(new RelColumnOrigin(table, iOutputColumn, false));
@@ -392,9 +403,9 @@ public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, i
private Set createDerivedColumnOrigins(Set inputSet) {
if (inputSet == null) {
- return null;
+ return Collections.emptySet();
}
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
for (RelColumnOrigin rco : inputSet) {
RelColumnOrigin derived = new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true);
set.add(derived);
@@ -402,10 +413,113 @@ private Set createDerivedColumnOrigins(Set inp
return set;
}
+ private Set createDerivedColumnOrigins(
+ Set inputSet, String transform, boolean originTransform) {
+ if (inputSet == null || inputSet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ final Set set = new LinkedHashSet<>();
+
+ String finalTransform = originTransform ? computeTransform(inputSet, transform) : transform;
+ for (RelColumnOrigin rco : inputSet) {
+ RelColumnOrigin derived =
+ new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true, finalTransform);
+ set.add(derived);
+ }
+ return set;
+ }
+
+ /**
+ * Replace the variable at the beginning of $ in input with the real field information
+ */
+ private String computeTransform(Set inputSet, String transform) {
+ LOG.debug("origin transform: {}", transform);
+ Pattern pattern = Pattern.compile("\\$\\d+");
+ Matcher matcher = pattern.matcher(transform);
+
+ Set operandSet = new LinkedHashSet<>();
+ while (matcher.find()) {
+ operandSet.add(matcher.group());
+ }
+
+ if (operandSet.isEmpty()) {
+ LOG.info("operandSet is empty");
+ return null;
+ }
+ if (inputSet.size() != operandSet.size()) {
+ LOG.warn(
+ "The number [{}] of fields in the source tables are not equal to operands [{}]",
+ inputSet.size(),
+ operandSet.size());
+ return null;
+ }
+
+ Map sourceColumnMap = new HashMap<>();
+ Iterator iterator = optimizeSourceColumnSet(inputSet).iterator();
+ operandSet.forEach(e -> sourceColumnMap.put(e, iterator.next()));
+ LOG.debug("sourceColumnMap: {}", sourceColumnMap);
+
+ matcher = pattern.matcher(transform);
+ String temp;
+ while (matcher.find()) {
+ temp = matcher.group();
+ transform = transform.replace(temp, sourceColumnMap.get(temp));
+ }
+
+ // temporary special treatment
+ transform = transform.replace("_UTF-16LE", "");
+ LOG.debug("transform: {}", transform);
+ return transform;
+ }
+
+ /**
+ * Increase the readability of transform.
+ * if catalog, database and table are the same, return field.
+ * If the catalog and database are the same, return the table and field.
+ * If the catalog is the same, return the database, table, field.
+ * Otherwise, return all
+ */
+ private Set optimizeSourceColumnSet(Set inputSet) {
+ Set catalogSet = new HashSet<>();
+ Set databaseSet = new HashSet<>();
+ Set tableSet = new HashSet<>();
+ Set> qualifiedSet = new LinkedHashSet<>();
+ for (RelColumnOrigin rco : inputSet) {
+ RelOptTable originTable = rco.getOriginTable();
+ List qualifiedName = originTable.getQualifiedName();
+
+ // catalog,database,table,field
+ List qualifiedList = new ArrayList<>(qualifiedName);
+ catalogSet.add(qualifiedName.get(0));
+ databaseSet.add(qualifiedName.get(1));
+ tableSet.add(qualifiedName.get(2));
+
+ String field = rco.getTransform() != null
+ ? rco.getTransform()
+ : originTable.getRowType().getFieldNames().get(rco.getOriginColumnOrdinal());
+ qualifiedList.add(field);
+ qualifiedSet.add(qualifiedList);
+ }
+ if (catalogSet.size() == 1 && databaseSet.size() == 1 && tableSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> e.get(3));
+ } else if (catalogSet.size() == 1 && databaseSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(2, 4)));
+ } else if (catalogSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(1, 4)));
+ } else {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e));
+ }
+ }
+
+ private Set optimizeName(Set> qualifiedSet, Function, String> mapper) {
+ return qualifiedSet.stream().map(mapper).collect(Collectors.toSet());
+ }
+
private Set getMultipleColumns(RexNode rexNode, RelNode input, final RelMetadataQuery mq) {
- final Set set = new HashSet<>();
+ final Set set = new LinkedHashSet<>();
final RexVisitor visitor = new RexVisitorImpl(true) {
+ @Override
public Void visitInputRef(RexInputRef inputRef) {
Set inputSet = mq.getColumnOrigins(input, inputRef.getIndex());
if (inputSet != null) {
diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
index 0288e906fc..d8547b223d 100644
--- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
+++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
@@ -23,7 +23,6 @@
import org.dinky.context.DinkyClassLoaderContextHolder;
import org.dinky.data.model.LineageRel;
import org.dinky.data.result.SqlExplainResult;
-import org.dinky.utils.FlinkStreamProgramWithoutPhysical;
import org.dinky.utils.LineageContext;
import org.apache.flink.api.dag.Transformation;
@@ -50,7 +49,6 @@
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.types.Row;
import java.util.ArrayList;
@@ -78,13 +76,10 @@ public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {
private static final Logger log = LoggerFactory.getLogger(CustomTableEnvironmentImpl.class);
- private final FlinkChainedProgram flinkChainedProgram;
private static final ObjectMapper mapper = new ObjectMapper();
public CustomTableEnvironmentImpl(StreamTableEnvironment streamTableEnvironment) {
super(streamTableEnvironment);
- this.flinkChainedProgram = FlinkStreamProgramWithoutPhysical.buildProgram(
- (Configuration) getStreamExecutionEnvironment().getConfiguration());
}
public static CustomTableEnvironmentImpl create(StreamExecutionEnvironment executionEnvironment) {
@@ -260,8 +255,7 @@ private void setConfiguration(StreamExecutionEnvironment environment, Map getLineage(String statement) {
- LineageContext lineageContext =
- new LineageContext(flinkChainedProgram, (TableEnvironmentImpl) streamTableEnvironment);
+ LineageContext lineageContext = new LineageContext((TableEnvironmentImpl) streamTableEnvironment);
return lineageContext.getLineage(statement);
}
diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java
deleted file mode 100644
index bf4808939d..0000000000
--- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/FlinkStreamProgramWithoutPhysical.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.dinky.utils;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.hep.HepMatchOrder;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
-import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
-import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets;
-
-/**
- * FlinkStreamProgramWithoutPhysical
- *
- * @since 2022/11/22
- */
-public class FlinkStreamProgramWithoutPhysical {
-
- private static final String SUBQUERY_REWRITE = "subquery_rewrite";
- private static final String TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite";
- private static final String DECORRELATE = "decorrelate";
- private static final String DEFAULT_REWRITE = "default_rewrite";
- private static final String PREDICATE_PUSHDOWN = "predicate_pushdown";
- private static final String JOIN_REORDER = "join_reorder";
- private static final String PROJECT_REWRITE = "project_rewrite";
- private static final String LOGICAL = "logical";
- private static final String LOGICAL_REWRITE = "logical_rewrite";
-
- public static FlinkChainedProgram buildProgram(Configuration config) {
- FlinkChainedProgram chainedProgram = new FlinkChainedProgram();
-
- // rewrite sub-queries to joins
- chainedProgram.addLast(
- SUBQUERY_REWRITE,
- FlinkGroupProgramBuilder.newBuilder()
- // rewrite QueryOperationCatalogViewTable before rewriting sub-queries
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_REF_RULES())
- .build(),
- "convert table references before rewriting sub-queries to" + " semi-join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.SEMI_JOIN_RULES())
- .build(),
- "rewrite sub-queries to semi-join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES())
- .build(),
- "sub-queries remove")
- // convert RelOptTableImpl (which exists in SubQuery before) to
- // FlinkRelOptTable
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.TABLE_REF_RULES())
- .build(),
- "convert table references after sub-queries removed")
- .build());
-
- // rewrite special temporal join plan
- chainedProgram.addLast(
- TEMPORAL_JOIN_REWRITE,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.EXPAND_PLAN_RULES())
- .build(),
- "convert correlate to temporal table join")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.POST_EXPAND_CLEAN_UP_RULES())
- .build(),
- "convert enumerable table scan")
- .build());
-
- // query decorrelation
- chainedProgram.addLast(
- DECORRELATE,
- FlinkGroupProgramBuilder.newBuilder()
- // rewrite before decorrelation
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PRE_DECORRELATION_RULES())
- .build(),
- "pre-rewrite before decorrelation")
- .addProgram(new FlinkDecorrelateProgram(), "")
- .build());
-
- // default rewrite, includes: predicate simplification, expression reduction, window
- // properties rewrite, etc.
- chainedProgram.addLast(
- DEFAULT_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.DEFAULT_REWRITE_RULES())
- .build());
-
- // rule based optimization: push down predicate(s) in where clause, so it only needs to read
- // the required data
- chainedProgram.addLast(
- PREDICATE_PUSHDOWN,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_PREDICATE_REWRITE_RULES())
- .build(),
- "join predicate rewrite")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES())
- .build(),
- "filter rules")
- .setIterations(5)
- .build(),
- "predicate rewrite")
- .addProgram(
- // PUSH_PARTITION_DOWN_RULES should always be in front of
- // PUSH_FILTER_DOWN_RULES
- // to prevent PUSH_FILTER_DOWN_RULES from consuming the predicates
- // in partitions
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PUSH_PARTITION_DOWN_RULES())
- .build(),
- "push down partitions into table scan")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(
- HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PUSH_FILTER_DOWN_RULES())
- .build(),
- "push down filters into table scan")
- .build(),
- "push predicate into table scan")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PRUNE_EMPTY_RULES())
- .build(),
- "prune empty after predicate push down")
- .build());
-
- // join reorder
- if (config.getBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
- chainedProgram.addLast(
- JOIN_REORDER,
- FlinkGroupProgramBuilder.newBuilder()
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_REORDER_PREPARE_RULES())
- .build(),
- "merge join into MultiJoin")
- .addProgram(
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.JOIN_REORDER_RULES())
- .build(),
- "do join reorder")
- .build());
- }
-
- // project rewrite
- chainedProgram.addLast(
- PROJECT_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.PROJECT_RULES())
- .build());
-
- // optimize the logical plan
- chainedProgram.addLast(
- LOGICAL,
- FlinkVolcanoProgramBuilder.newBuilder()
- .add(FlinkStreamRuleSets.LOGICAL_OPT_RULES())
- .setRequiredOutputTraits(new Convention.Impl[] {FlinkConventions.LOGICAL()})
- .build());
-
- // logical rewrite
- chainedProgram.addLast(
- LOGICAL_REWRITE,
- FlinkHepRuleSetProgramBuilder.newBuilder()
- .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
- .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
- .add(FlinkStreamRuleSets.LOGICAL_REWRITE())
- .build());
-
- return chainedProgram;
- }
-}
diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/LineageContext.java b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/LineageContext.java
index 2c31236b97..d707ade42a 100644
--- a/dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/LineageContext.java
+++ b/dinky-client/dinky-client-1.16/src/main/java/org/dinky/utils/LineageContext.java
@@ -27,23 +27,13 @@
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.SinkModifyOperation;
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
-import org.apache.flink.table.planner.calcite.RexFactory;
-import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
-import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
-import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
-import org.apache.flink.table.planner.plan.trait.MiniBatchInterval;
import java.util.ArrayList;
import java.util.List;
@@ -56,11 +46,9 @@
*/
public class LineageContext {
- private final FlinkChainedProgram flinkChainedProgram;
private final TableEnvironmentImpl tableEnv;
- public LineageContext(FlinkChainedProgram flinkChainedProgram, TableEnvironmentImpl tableEnv) {
- this.flinkChainedProgram = flinkChainedProgram;
+ public LineageContext(TableEnvironmentImpl tableEnv) {
this.tableEnv = tableEnv;
}
@@ -70,11 +58,8 @@ public List