shipFiles) {
+ yarnClusterDescriptor.addShipFiles(shipFiles);
+ }
+
+ @Override
+ public KubernetesClusterDescriptor createKubernetesClusterDescriptor(
+ Configuration configuration, FlinkKubeClient flinkKubeClient) {
+ return new KubernetesClusterDescriptor(configuration, flinkKubeClient);
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/pom.xml b/dinky-client/dinky-client-1.19/pom.xml
new file mode 100644
index 0000000000..3ac2ab0376
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/pom.xml
@@ -0,0 +1,63 @@
+
+
+ 4.0.0
+
+ org.dinky
+ dinky-client
+ ${revision}
+ ../pom.xml
+
+ dinky-client-1.19
+
+ jar
+
+ Dinky : Client 1.19
+
+
+
+ org.dinky
+ dinky-client-base
+
+
+ org.dinky
+ dinky-flink-1.19
+ ${scope.runtime}
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ provided
+
+
+ javax.xml.bind
+ jaxb-api
+
+
+ com.sun.xml.bind
+ jaxb-impl
+
+
+ com.sun.xml.bind
+ jaxb-core
+
+
+ junit
+ junit
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ ${maven-jar-plugin.version}
+
+ ${project.parent.parent.basedir}/build/extends
+
+
+
+
+
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java b/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
new file mode 100644
index 0000000000..5fc8dc24cb
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/rel/metadata/RelColumnOrigin.java
@@ -0,0 +1,116 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.calcite.rel.metadata;
+
+import org.apache.calcite.plan.RelOptTable;
+
+/**
+ * Modified based on calcite's source code org.apache.calcite.rel.metadata.RelColumnOrigin
+ *
+ * Modification point:
+ *
+ * - add transform field and related code.
+ *
+ *
+ * @description: RelColumnOrigin is a data structure describing one of the origins of an
+ * output column produced by a relational expression.
+ * @author: HamaWhite
+ */
+public class RelColumnOrigin {
+ // ~ Instance fields --------------------------------------------------------
+
+ private final RelOptTable originTable;
+
+ private final int iOriginColumn;
+
+ private final boolean isDerived;
+
+ /**
+ * Stores the expression for data conversion,
+ * which source table fields are transformed by which expression the target field
+ */
+ private String transform;
+
+ // ~ Constructors -----------------------------------------------------------
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ }
+
+ public RelColumnOrigin(RelOptTable originTable, int iOriginColumn, boolean isDerived, String transform) {
+ this.originTable = originTable;
+ this.iOriginColumn = iOriginColumn;
+ this.isDerived = isDerived;
+ this.transform = transform;
+ }
+
+ // ~ Methods ----------------------------------------------------------------
+
+ /**
+ * Returns table of origin.
+ */
+ public RelOptTable getOriginTable() {
+ return originTable;
+ }
+
+ /**
+ * Returns the 0-based index of column in origin table; whether this ordinal
+ * is flattened or unflattened depends on whether UDT flattening has already
+ * been performed on the relational expression which produced this
+ * description.
+ */
+ public int getOriginColumnOrdinal() {
+ return iOriginColumn;
+ }
+
+ /**
+ * Consider the query select a+b as c, d as e from t
. The
+ * output column c has two origins (a and b), both of them derived. The
+ * output column d as one origin (c), which is not derived.
+ *
+ * @return false if value taken directly from column in origin table; true
+ * otherwise
+ */
+ public boolean isDerived() {
+ return isDerived;
+ }
+
+ public String getTransform() {
+ return transform;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RelColumnOrigin)) {
+ return false;
+ }
+ RelColumnOrigin other = (RelColumnOrigin) obj;
+ return originTable.getQualifiedName().equals(other.originTable.getQualifiedName())
+ && (iOriginColumn == other.iOriginColumn)
+ && (isDerived == other.isDerived);
+ }
+
+ @Override
+ public int hashCode() {
+ return originTable.getQualifiedName().hashCode() + iOriginColumn + (isDerived ? 313 : 0);
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java b/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
new file mode 100644
index 0000000000..5c8aae002a
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnOrigins.java
@@ -0,0 +1,534 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.calcite.rel.metadata;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Match;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableFunctionScan;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.rex.RexVisitorImpl;
+import org.apache.calcite.util.BuiltInMethod;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Modified based on calcite's source code org.apache.calcite.rel.metadata.RelMdColumnOrigins
+ *
+ * Modification point:
+ *
+ * - Support lookup join, add method getColumnOrigins(Snapshot rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support watermark, add method getColumnOrigins(SingleRel rel,RelMetadataQuery mq, int iOutputColumn)
+ *
- Support table function, add method getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support CEP, add method getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support transform, add method createDerivedColumnOrigins(Set inputSet, String transform, boolean originTransform), and related code
+ *
- Support field AS LOCALTIMESTAMP, modify method getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
- Support PROCTIME() is the first filed, add method computeIndexWithOffset, used by getColumnOrigins(Project rel, RelMetadataQuery mq, int iOutputColumn)
+ *
+ *
+ * @description: RelMdColumnOrigins supplies a default implementation of {@link RelMetadataQuery#getColumnOrigins} for the standard logical algebra.
+ * @author: HamaWhite
+ */
+public class RelMdColumnOrigins implements MetadataHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RelMdColumnOrigins.class);
+
+ public static final String DELIMITER = ".";
+
+ public static final RelMetadataProvider SOURCE = ReflectiveRelMetadataProvider.reflectiveSource(
+ BuiltInMethod.COLUMN_ORIGIN.method, new RelMdColumnOrigins());
+
+ // ~ Constructors -----------------------------------------------------------
+
+ private RelMdColumnOrigins() {}
+
+ // ~ Methods ----------------------------------------------------------------
+
+ public MetadataDef getDef() {
+ return BuiltInMetadata.ColumnOrigin.DEF;
+ }
+
+ public Set getColumnOrigins(Aggregate rel, RelMetadataQuery mq, int iOutputColumn) {
+ if (iOutputColumn < rel.getGroupCount()) {
+ // get actual index of Group columns.
+ return mq.getColumnOrigins(
+ rel.getInput(), rel.getGroupSet().asList().get(iOutputColumn));
+ }
+
+ // Aggregate columns are derived from input columns
+ AggregateCall call = rel.getAggCallList().get(iOutputColumn - rel.getGroupCount());
+
+ final Set set = new LinkedHashSet<>();
+ for (Integer iInput : call.getArgList()) {
+ Set inputSet = mq.getColumnOrigins(rel.getInput(), iInput);
+ inputSet = createDerivedColumnOrigins(inputSet, call.toString(), true);
+ if (inputSet != null) {
+ set.addAll(inputSet);
+ }
+ }
+ return set;
+ }
+
+ public Set getColumnOrigins(Join rel, RelMetadataQuery mq, int iOutputColumn) {
+ int nLeftColumns = rel.getLeft().getRowType().getFieldList().size();
+ Set set;
+ boolean derived = false;
+ if (iOutputColumn < nLeftColumns) {
+ set = mq.getColumnOrigins(rel.getLeft(), iOutputColumn);
+ if (rel.getJoinType().generatesNullsOnLeft()) {
+ derived = true;
+ }
+ } else {
+ set = mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns);
+ if (rel.getJoinType().generatesNullsOnRight()) {
+ derived = true;
+ }
+ }
+ if (derived) {
+ // nulls are generated due to outer join; that counts
+ // as derivation
+ set = createDerivedColumnOrigins(set);
+ }
+ return set;
+ }
+
+ /**
+ * Support the field blood relationship of table function
+ */
+ public Set getColumnOrigins(Correlate rel, RelMetadataQuery mq, int iOutputColumn) {
+
+ List leftFieldList = rel.getLeft().getRowType().getFieldList();
+
+ int nLeftColumns = leftFieldList.size();
+ Set set;
+ if (iOutputColumn < nLeftColumns) {
+ set = mq.getColumnOrigins(rel.getLeft(), iOutputColumn);
+ } else {
+ if (rel.getRight() instanceof TableFunctionScan) {
+ // get the field name of the left table configured in the Table Function on the right
+ TableFunctionScan tableFunctionScan = (TableFunctionScan) rel.getRight();
+ RexCall rexCall = (RexCall) tableFunctionScan.getCall();
+ // support only one field in table function
+ RexFieldAccess rexFieldAccess =
+ (RexFieldAccess) rexCall.getOperands().get(0);
+ String fieldName = rexFieldAccess.getField().getName();
+
+ int leftFieldIndex = 0;
+ for (int i = 0; i < nLeftColumns; i++) {
+ if (leftFieldList.get(i).getName().equalsIgnoreCase(fieldName)) {
+ leftFieldIndex = i;
+ break;
+ }
+ }
+ /**
+ * Get the fields from the left table, don't go to
+ * getColumnOrigins(TableFunctionScan rel,RelMetadataQuery mq, int iOutputColumn),
+ * otherwise the return is null, and the UDTF field origin cannot be parsed
+ */
+ set = mq.getColumnOrigins(rel.getLeft(), leftFieldIndex);
+
+ // process transform for udtf
+ String transform = rexCall.toString().replace(rexFieldAccess.toString(), fieldName)
+ + DELIMITER
+ + tableFunctionScan.getRowType().getFieldNames().get(iOutputColumn - nLeftColumns);
+ set = createDerivedColumnOrigins(set, transform, false);
+ } else {
+ set = mq.getColumnOrigins(rel.getRight(), iOutputColumn - nLeftColumns);
+ }
+ }
+ return set;
+ }
+
+ public Set getColumnOrigins(SetOp rel, RelMetadataQuery mq, int iOutputColumn) {
+ final Set set = new LinkedHashSet<>();
+ for (RelNode input : rel.getInputs()) {
+ Set inputSet = mq.getColumnOrigins(input, iOutputColumn);
+ if (inputSet == null) {
+ return Collections.emptySet();
+ }
+ set.addAll(inputSet);
+ }
+ return set;
+ }
+
+ /**
+ * Support the field blood relationship of lookup join
+ */
+ public Set getColumnOrigins(Snapshot rel, RelMetadataQuery mq, int iOutputColumn) {
+ return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
+ }
+
+ /**
+ * Support the field blood relationship of watermark
+ */
+ public Set getColumnOrigins(SingleRel rel, RelMetadataQuery mq, int iOutputColumn) {
+ return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
+ }
+
+ /**
+ * Support for new fields in the source table similar to those created with the LOCALTIMESTAMP function
+ */
+ public Set getColumnOrigins(Project rel, final RelMetadataQuery mq, int iOutputColumn) {
+ final RelNode input = rel.getInput();
+ RexNode rexNode = rel.getProjects().get(iOutputColumn);
+
+ if (rexNode instanceof RexInputRef) {
+ // Direct reference: no derivation added.
+ RexInputRef inputRef = (RexInputRef) rexNode;
+ int index = inputRef.getIndex();
+ if (input instanceof TableScan) {
+ index = computeIndexWithOffset(rel.getProjects(), inputRef.getIndex(), iOutputColumn);
+ }
+ return mq.getColumnOrigins(input, index);
+ } else if (input instanceof TableScan
+ && rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty()) {
+ return mq.getColumnOrigins(input, iOutputColumn);
+ }
+ // Anything else is a derivation, possibly from multiple columns.
+ final Set set = getMultipleColumns(rexNode, input, mq);
+ return createDerivedColumnOrigins(set, rexNode.toString(), true);
+ }
+
+ private int computeIndexWithOffset(List projects, int baseIndex, int iOutputColumn) {
+ int offset = 0;
+ for (int index = 0; index < iOutputColumn; index++) {
+ RexNode rexNode = projects.get(index);
+ if ((rexNode.getClass().equals(RexCall.class)
+ && ((RexCall) rexNode).getOperands().isEmpty())) {
+ offset += 1;
+ }
+ }
+ return baseIndex + offset;
+ }
+
+ /**
+ * Support field blood relationship of CEP.
+ * The first column is the field after PARTITION BY, and the other columns come from the measures in Match
+ */
+ public Set getColumnOrigins(Match rel, RelMetadataQuery mq, int iOutputColumn) {
+ int orderCount = rel.getOrderKeys().getKeys().size();
+
+ if (iOutputColumn < orderCount) {
+ return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
+ }
+ final RelNode input = rel.getInput();
+ RexNode rexNode = rel.getMeasures().values().asList().get(iOutputColumn - orderCount);
+
+ RexPatternFieldRef rexPatternFieldRef = searchRexPatternFieldRef(rexNode);
+ if (rexPatternFieldRef != null) {
+ final Set set = mq.getColumnOrigins(input, rexPatternFieldRef.getIndex());
+ String originTransform = rexNode instanceof RexCall
+ ? ((RexCall) rexNode).getOperands().get(0).toString()
+ : null;
+ return createDerivedColumnOrigins(set, originTransform, true);
+ }
+ return Collections.emptySet();
+ }
+
+ private RexPatternFieldRef searchRexPatternFieldRef(RexNode rexNode) {
+ if (rexNode instanceof RexCall) {
+ RexNode operand = ((RexCall) rexNode).getOperands().get(0);
+ if (operand instanceof RexPatternFieldRef) {
+ return (RexPatternFieldRef) operand;
+ } else {
+ // recursive search
+ return searchRexPatternFieldRef(operand);
+ }
+ }
+ return null;
+ }
+
+ public Set getColumnOrigins(Calc rel, final RelMetadataQuery mq, int iOutputColumn) {
+ final RelNode input = rel.getInput();
+ final RexShuttle rexShuttle = new RexShuttle() {
+
+ @Override
+ public RexNode visitLocalRef(RexLocalRef localRef) {
+ return rel.getProgram().expandLocalRef(localRef);
+ }
+ };
+ final List projects = new ArrayList<>();
+ for (RexNode rex : rexShuttle.apply(rel.getProgram().getProjectList())) {
+ projects.add(rex);
+ }
+ final RexNode rexNode = projects.get(iOutputColumn);
+ if (rexNode instanceof RexInputRef) {
+ // Direct reference: no derivation added.
+ RexInputRef inputRef = (RexInputRef) rexNode;
+ return mq.getColumnOrigins(input, inputRef.getIndex());
+ }
+ // Anything else is a derivation, possibly from multiple columns.
+ final Set set = getMultipleColumns(rexNode, input, mq);
+ return createDerivedColumnOrigins(set);
+ }
+
+ public Set getColumnOrigins(Filter rel, RelMetadataQuery mq, int iOutputColumn) {
+ return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
+ }
+
+ public Set getColumnOrigins(Sort rel, RelMetadataQuery mq, int iOutputColumn) {
+ return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
+ }
+
+ public Set getColumnOrigins(TableModify rel, RelMetadataQuery mq, int iOutputColumn) {
+ return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
+ }
+
+ public Set getColumnOrigins(Exchange rel, RelMetadataQuery mq, int iOutputColumn) {
+ return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
+ }
+
+ public Set getColumnOrigins(TableFunctionScan rel, RelMetadataQuery mq, int iOutputColumn) {
+ final Set set = new LinkedHashSet<>();
+ Set mappings = rel.getColumnMappings();
+ if (mappings == null) {
+ if (!rel.getInputs().isEmpty()) {
+ // This is a non-leaf transformation: say we don't
+ // know about origins, because there are probably
+ // columns below.
+ return Collections.emptySet();
+ } else {
+ // This is a leaf transformation: say there are fer sure no
+ // column origins.
+ return set;
+ }
+ }
+ for (RelColumnMapping mapping : mappings) {
+ if (mapping.iOutputColumn != iOutputColumn) {
+ continue;
+ }
+ final RelNode input = rel.getInputs().get(mapping.iInputRel);
+ final int column = mapping.iInputColumn;
+ Set origins = mq.getColumnOrigins(input, column);
+ if (origins == null) {
+ return Collections.emptySet();
+ }
+ if (mapping.derived) {
+ origins = createDerivedColumnOrigins(origins);
+ }
+ set.addAll(origins);
+ }
+ return set;
+ }
+
+ // Catch-all rule when none of the others apply.
+ @SuppressWarnings("squid:S1172")
+ public Set getColumnOrigins(RelNode rel, RelMetadataQuery mq, int iOutputColumn) {
+ // NOTE jvs 28-Mar-2006: We may get this wrong for a physical table
+ // expression which supports projections. In that case,
+ // it's up to the plugin writer to override with the
+ // correct information.
+
+ if (!rel.getInputs().isEmpty()) {
+ // No generic logic available for non-leaf rels.
+ return Collections.emptySet();
+ }
+
+ final Set set = new LinkedHashSet<>();
+
+ RelOptTable table = rel.getTable();
+ if (table == null) {
+ // Somebody is making column values up out of thin air, like a
+ // VALUES clause, so we return an empty set.
+ return set;
+ }
+
+ // Detect the case where a physical table expression is performing
+ // projection, and say we don't know instead of making any assumptions.
+ // (Theoretically we could try to map the projection using column
+ // names.) This detection assumes the table expression doesn't handle
+ // rename as well.
+ if (table.getRowType() != rel.getRowType()) {
+ return Collections.emptySet();
+ }
+
+ set.add(new RelColumnOrigin(table, iOutputColumn, false));
+ return set;
+ }
+
+ private Set createDerivedColumnOrigins(Set inputSet) {
+ if (inputSet == null) {
+ return Collections.emptySet();
+ }
+ final Set set = new LinkedHashSet<>();
+ for (RelColumnOrigin rco : inputSet) {
+ RelColumnOrigin derived = new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true);
+ set.add(derived);
+ }
+ return set;
+ }
+
+ private Set createDerivedColumnOrigins(
+ Set inputSet, String transform, boolean originTransform) {
+ if (inputSet == null || inputSet.isEmpty()) {
+ return Collections.emptySet();
+ }
+ final Set set = new LinkedHashSet<>();
+
+ String finalTransform = originTransform ? computeTransform(inputSet, transform) : transform;
+ for (RelColumnOrigin rco : inputSet) {
+ RelColumnOrigin derived =
+ new RelColumnOrigin(rco.getOriginTable(), rco.getOriginColumnOrdinal(), true, finalTransform);
+ set.add(derived);
+ }
+ return set;
+ }
+
+ /**
+ * Replace the variable at the beginning of $ in input with the real field information
+ */
+ private String computeTransform(Set inputSet, String transform) {
+ LOG.debug("origin transform: {}", transform);
+ Pattern pattern = Pattern.compile("\\$\\d+");
+ Matcher matcher = pattern.matcher(transform);
+
+ Set operandSet = new LinkedHashSet<>();
+ while (matcher.find()) {
+ operandSet.add(matcher.group());
+ }
+
+ if (operandSet.isEmpty()) {
+ LOG.info("operandSet is empty");
+ return null;
+ }
+ if (inputSet.size() != operandSet.size()) {
+ LOG.warn(
+ "The number [{}] of fields in the source tables are not equal to operands [{}]",
+ inputSet.size(),
+ operandSet.size());
+ return null;
+ }
+
+ Map sourceColumnMap = new HashMap<>();
+ Iterator iterator = optimizeSourceColumnSet(inputSet).iterator();
+ operandSet.forEach(e -> sourceColumnMap.put(e, iterator.next()));
+ LOG.debug("sourceColumnMap: {}", sourceColumnMap);
+
+ matcher = pattern.matcher(transform);
+ String temp;
+ while (matcher.find()) {
+ temp = matcher.group();
+ transform = transform.replace(temp, sourceColumnMap.get(temp));
+ }
+
+ // temporary special treatment
+ transform = transform.replace("_UTF-16LE", "");
+ LOG.debug("transform: {}", transform);
+ return transform;
+ }
+
+ /**
+ * Increase the readability of transform.
+ * if catalog, database and table are the same, return field.
+ * If the catalog and database are the same, return the table and field.
+ * If the catalog is the same, return the database, table, field.
+ * Otherwise, return all
+ */
+ private Set optimizeSourceColumnSet(Set inputSet) {
+ Set catalogSet = new HashSet<>();
+ Set databaseSet = new HashSet<>();
+ Set tableSet = new HashSet<>();
+ Set> qualifiedSet = new LinkedHashSet<>();
+ for (RelColumnOrigin rco : inputSet) {
+ RelOptTable originTable = rco.getOriginTable();
+ List qualifiedName = originTable.getQualifiedName();
+
+ // catalog,database,table,field
+ List qualifiedList = new ArrayList<>(qualifiedName);
+ catalogSet.add(qualifiedName.get(0));
+ databaseSet.add(qualifiedName.get(1));
+ tableSet.add(qualifiedName.get(2));
+
+ String field = rco.getTransform() != null
+ ? rco.getTransform()
+ : originTable.getRowType().getFieldNames().get(rco.getOriginColumnOrdinal());
+ qualifiedList.add(field);
+ qualifiedSet.add(qualifiedList);
+ }
+ if (catalogSet.size() == 1 && databaseSet.size() == 1 && tableSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> e.get(3));
+ } else if (catalogSet.size() == 1 && databaseSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(2, 4)));
+ } else if (catalogSet.size() == 1) {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e.subList(1, 4)));
+ } else {
+ return optimizeName(qualifiedSet, e -> String.join(DELIMITER, e));
+ }
+ }
+
+ private Set optimizeName(Set> qualifiedSet, Function, String> mapper) {
+ return qualifiedSet.stream().map(mapper).collect(Collectors.toSet());
+ }
+
+ private Set getMultipleColumns(RexNode rexNode, RelNode input, final RelMetadataQuery mq) {
+ final Set set = new LinkedHashSet<>();
+ final RexVisitor visitor = new RexVisitorImpl(true) {
+
+ @Override
+ public Void visitInputRef(RexInputRef inputRef) {
+ Set inputSet = mq.getColumnOrigins(input, inputRef.getIndex());
+ if (inputSet != null) {
+ set.addAll(inputSet);
+ }
+ return null;
+ }
+ };
+ rexNode.accept(visitor);
+ return set;
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/sql/SqlSelect.java b/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/sql/SqlSelect.java
new file mode 100644
index 0000000000..d8824d0246
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/apache/calcite/sql/SqlSelect.java
@@ -0,0 +1,355 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.calcite.sql;
+
+import org.dinky.context.CustomTableEnvironmentContext;
+import org.dinky.context.RowLevelPermissionsContext;
+import org.dinky.executor.ExtendedParser;
+
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A SqlSelect
is a node of a parse tree which represents a select statement. It
+ * warrants its own node type just because we have a lot of methods to put somewhere.
+ *
+ * @description: Modify the value method of where to addCondition() to support row-level permission
+ * filtering
+ */
+public class SqlSelect extends SqlCall {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SqlSelect.class);
+
+ public static final int FROM_OPERAND = 2;
+ public static final int WHERE_OPERAND = 3;
+ public static final int HAVING_OPERAND = 5;
+
+ SqlNodeList keywordList;
+ SqlNodeList selectList;
+ SqlNode from;
+ SqlNode where;
+ SqlNodeList groupBy;
+ SqlNode having;
+ SqlNodeList windowDecls;
+ SqlNodeList orderBy;
+ SqlNode offset;
+ SqlNode fetch;
+ SqlNodeList hints;
+
+ public SqlSelect(
+ SqlParserPos pos,
+ SqlNodeList keywordList,
+ SqlNodeList selectList,
+ SqlNode from,
+ SqlNode where,
+ SqlNodeList groupBy,
+ SqlNode having,
+ SqlNodeList windowDecls,
+ SqlNodeList orderBy,
+ SqlNode offset,
+ SqlNode fetch,
+ SqlNodeList hints) {
+ super(pos);
+ this.keywordList = Objects.requireNonNull(keywordList != null ? keywordList : new SqlNodeList(pos));
+ this.selectList = selectList;
+ this.from = from;
+ this.groupBy = groupBy;
+ this.having = having;
+ this.windowDecls = Objects.requireNonNull(windowDecls != null ? windowDecls : new SqlNodeList(pos));
+ this.orderBy = orderBy;
+ this.offset = offset;
+ this.fetch = fetch;
+ this.hints = hints;
+
+ // add row level filter condition for where clause
+ this.where = addCondition(from, where, false);
+ }
+
+ /** The main process of controlling row-level permissions */
+ private SqlNode addCondition(SqlNode from, SqlNode where, boolean fromJoin) {
+ if (from instanceof SqlIdentifier) {
+ String tableName = from.toString();
+ // the table name is used as an alias for join
+ String tableAlias = fromJoin ? tableName : null;
+ return addPermission(where, tableName, tableAlias);
+ } else if (from instanceof SqlJoin) {
+ SqlJoin sqlJoin = (SqlJoin) from;
+ // support recursive processing, such as join for three tables, process left sqlNode
+ where = addCondition(sqlJoin.getLeft(), where, true);
+ // process right sqlNode
+ return addCondition(sqlJoin.getRight(), where, true);
+ } else if (from instanceof SqlBasicCall) {
+ // Table has an alias or comes from a subquery
+ List operandList = ((SqlBasicCall) from).getOperandList();
+ /**
+ * If there is a subquery in the Join, row-level filtering has been appended to the
+ * subquery. What is returned here is the SqlSelect type, just return the original where
+ * directly
+ */
+ if (!(operandList.get(0) instanceof SqlIdentifier)) {
+ return where;
+ }
+ String tableName = operandList.get(0).toString();
+ String tableAlias = operandList.get(1).toString();
+ return addPermission(where, tableName, tableAlias);
+ }
+ return where;
+ }
+
+ /** Add row-level filtering based on user-configured permission points */
+ private SqlNode addPermission(SqlNode where, String tableName, String tableAlias) {
+ SqlBasicCall permissions = null;
+ ConcurrentHashMap permissionsMap = RowLevelPermissionsContext.get();
+ if (permissionsMap != null) {
+ String permissionsStatement = permissionsMap.get(tableName);
+ if (permissionsStatement != null && !"".equals(permissionsStatement)) {
+ if (CustomTableEnvironmentContext.get().getParser() instanceof ExtendedParser) {
+ ExtendedParser extendedParser =
+ (ExtendedParser) CustomTableEnvironmentContext.get().getParser();
+ permissions =
+ (SqlBasicCall) (extendedParser.getCustomParser()).parseExpression(permissionsStatement);
+ } else {
+ throw new RuntimeException("CustomParser is not set");
+ }
+ }
+ }
+
+ // add an alias
+ if (permissions != null && tableAlias != null) {
+ ImmutableList namesList = ImmutableList.of(
+ tableAlias, permissions.getOperandList().get(0).toString());
+ permissions.getOperandList().set(0, new SqlIdentifier(namesList, null, new SqlParserPos(0, 0), null));
+ }
+
+ return buildWhereClause(where, permissions);
+ }
+
+ /** Rebuild the where clause */
+ private SqlNode buildWhereClause(SqlNode where, SqlBasicCall permissions) {
+ if (permissions != null) {
+ if (where == null) {
+ return permissions;
+ }
+ SqlBinaryOperator sqlBinaryOperator =
+ new SqlBinaryOperator(SqlKind.AND.name(), SqlKind.AND, 0, true, null, null, null);
+ SqlNode[] operands = new SqlNode[2];
+ operands[0] = where;
+ operands[1] = permissions;
+ SqlParserPos sqlParserPos = new SqlParserPos(0, 0);
+ return new SqlBasicCall(sqlBinaryOperator, operands, sqlParserPos);
+ }
+ return where;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return SqlSelectOperator.INSTANCE;
+ }
+
+ @Override
+ public SqlKind getKind() {
+ return SqlKind.SELECT;
+ }
+
+ @Override
+ public List getOperandList() {
+ return ImmutableNullableList.of(
+ keywordList, selectList, from, where, groupBy, having, windowDecls, orderBy, offset, fetch, hints);
+ }
+
+ @Override
+ public void setOperand(int i, SqlNode operand) {
+ switch (i) {
+ case 0:
+ keywordList = Objects.requireNonNull((SqlNodeList) operand);
+ break;
+ case 1:
+ selectList = (SqlNodeList) operand;
+ break;
+ case 2:
+ from = operand;
+ break;
+ case 3:
+ where = operand;
+ break;
+ case 4:
+ groupBy = (SqlNodeList) operand;
+ break;
+ case 5:
+ having = operand;
+ break;
+ case 6:
+ windowDecls = Objects.requireNonNull((SqlNodeList) operand);
+ break;
+ case 7:
+ orderBy = (SqlNodeList) operand;
+ break;
+ case 8:
+ offset = operand;
+ break;
+ case 9:
+ fetch = operand;
+ break;
+ default:
+ throw new AssertionError(i);
+ }
+ }
+
+ public final boolean isDistinct() {
+ return getModifierNode(SqlSelectKeyword.DISTINCT) != null;
+ }
+
+ public final SqlNode getModifierNode(SqlSelectKeyword modifier) {
+ for (SqlNode keyword : keywordList) {
+ SqlSelectKeyword keyword2 = ((SqlLiteral) keyword).symbolValue(SqlSelectKeyword.class);
+ if (keyword2 == modifier) {
+ return keyword;
+ }
+ }
+ return null;
+ }
+
+ public final SqlNode getFrom() {
+ return from;
+ }
+
+ public void setFrom(SqlNode from) {
+ this.from = from;
+ }
+
+ public final SqlNodeList getGroup() {
+ return groupBy;
+ }
+
+ public void setGroupBy(SqlNodeList groupBy) {
+ this.groupBy = groupBy;
+ }
+
+ public final SqlNode getHaving() {
+ return having;
+ }
+
+ public void setHaving(SqlNode having) {
+ this.having = having;
+ }
+
+ public final SqlNodeList getSelectList() {
+ return selectList;
+ }
+
+ public void setSelectList(SqlNodeList selectList) {
+ this.selectList = selectList;
+ }
+
+ public final SqlNode getWhere() {
+ return where;
+ }
+
+ public void setWhere(SqlNode whereClause) {
+ this.where = whereClause;
+ }
+
+ @Nonnull
+ public final SqlNodeList getWindowList() {
+ return windowDecls;
+ }
+
+ public final SqlNodeList getOrderList() {
+ return orderBy;
+ }
+
+ public void setOrderBy(SqlNodeList orderBy) {
+ this.orderBy = orderBy;
+ }
+
+ public final SqlNode getOffset() {
+ return offset;
+ }
+
+ public void setOffset(SqlNode offset) {
+ this.offset = offset;
+ }
+
+ public final SqlNode getFetch() {
+ return fetch;
+ }
+
+ public void setFetch(SqlNode fetch) {
+ this.fetch = fetch;
+ }
+
+ public void setHints(SqlNodeList hints) {
+ this.hints = hints;
+ }
+
+ public SqlNodeList getHints() {
+ return this.hints;
+ }
+
+ public boolean hasHints() {
+ // The hints may be passed as null explicitly.
+ return this.hints != null && this.hints.size() > 0;
+ }
+
+ @Override
+ public void validate(SqlValidator validator, SqlValidatorScope scope) {
+ validator.validateQuery(this, scope, validator.getUnknownType());
+ }
+
+ /** Override SqlCall, to introduce a sub-query frame. */
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ if (!writer.inQuery()) {
+ // If this SELECT is the topmost item in a sub-query, introduce a new
+ // frame. (The topmost item in the sub-query might be a UNION or
+ // ORDER. In this case, we don't need a wrapper frame.)
+ final SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.SUB_QUERY, "(", ")");
+ writer.getDialect().unparseCall(writer, this, 0, 0);
+ writer.endList(frame);
+ } else {
+ writer.getDialect().unparseCall(writer, this, leftPrec, rightPrec);
+ }
+ }
+
+ public boolean hasOrderBy() {
+ return orderBy != null && orderBy.size() != 0;
+ }
+
+ public boolean hasWhere() {
+ return where != null;
+ }
+
+ public boolean isKeywordPresent(SqlSelectKeyword targetKeyWord) {
+ return getModifierNode(targetKeyWord) != null;
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java
new file mode 100644
index 0000000000..6414271715
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/AbstractCustomTableEnvironment.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.dinky.data.model.LineageRel;
+import org.dinky.data.result.SqlExplainResult;
+import org.dinky.utils.LineageContext;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.operations.ExplainOperation;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.List;
+
+import cn.hutool.core.util.ReflectUtil;
+
+/** */
+public abstract class AbstractCustomTableEnvironment
+ implements CustomTableEnvironment, DefaultTableEnvironmentInternal, DefaultStreamTableEnvironment {
+
+ protected StreamTableEnvironment streamTableEnvironment;
+ protected ClassLoader userClassLoader;
+
+ protected AbstractCustomTableEnvironment() {}
+
+ protected AbstractCustomTableEnvironment(StreamTableEnvironment streamTableEnvironment) {
+ this.streamTableEnvironment = streamTableEnvironment;
+ }
+
+ @Override
+ public TableEnvironment getTableEnvironment() {
+ return streamTableEnvironment;
+ }
+
+ public StreamExecutionEnvironment getStreamExecutionEnvironment() {
+ return ((StreamTableEnvironmentImpl) streamTableEnvironment).execEnv();
+ }
+
+ @Override
+ public ClassLoader getUserClassLoader() {
+ return userClassLoader;
+ }
+
+ public Planner getPlanner() {
+ return ((StreamTableEnvironmentImpl) streamTableEnvironment).getPlanner();
+ }
+
+ @Override
+ public void injectParser(CustomParser parser) {
+ ReflectUtil.setFieldValue(getPlanner(), "parser", new ParserWrapper(parser));
+ }
+
+ @Override
+ public void injectExtendedExecutor(CustomExtendedOperationExecutor extendedExecutor) {}
+
+ @Override
+ public Configuration getRootConfiguration() {
+ return (Configuration) this.getConfig().getRootConfiguration();
+ }
+
+ @Override
+ public SqlExplainResult explainSqlRecord(String statement, ExplainDetail... extraDetails) {
+ List operations = getParser().parse(statement);
+ if (operations.size() != 1) {
+ throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
+ }
+
+ Operation operation = operations.get(0);
+ SqlExplainResult data = new SqlExplainResult();
+ data.setParseTrue(true);
+ data.setExplainTrue(true);
+
+ if (operation instanceof ModifyOperation) {
+ data.setType("Modify DML");
+ } else if (operation instanceof ExplainOperation) {
+ data.setType("Explain DML");
+ } else if (operation instanceof QueryOperation) {
+ data.setType("Query DML");
+ } else {
+ data.setExplain(operation.asSummaryString());
+ data.setType("DDL");
+
+ // data.setExplain("DDL statement needn't comment。");
+ return data;
+ }
+
+ data.setExplain(getPlanner().explain(operations, ExplainFormat.TEXT, extraDetails));
+ return data;
+ }
+
+ @Override
+ public List getLineage(String statement) {
+ LineageContext lineageContext = new LineageContext(this);
+ return lineageContext.analyzeLineage(statement);
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ClusterDescriptorAdapterImpl.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ClusterDescriptorAdapterImpl.java
new file mode 100644
index 0000000000..14f7504bd1
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ClusterDescriptorAdapterImpl.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ClusterDescriptorAdapterImpl extends ClusterDescriptorAdapter {
+
+ public ClusterDescriptorAdapterImpl() {}
+
+ public ClusterDescriptorAdapterImpl(YarnClusterDescriptor yarnClusterDescriptor) {
+ super(yarnClusterDescriptor);
+ }
+
+ @Override
+ public void addShipFiles(List shipFiles) {
+ yarnClusterDescriptor.addShipFiles(shipFiles.stream()
+ .map(file -> {
+ return new Path(file.getPath());
+ })
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public KubernetesClusterDescriptor createKubernetesClusterDescriptor(
+ Configuration configuration, FlinkKubeClient flinkKubeClient) {
+ return new KubernetesClusterDescriptor(configuration, FlinkKubeClientFactory.getInstance());
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
new file mode 100644
index 0000000000..74c58c0d64
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableEnvironmentImpl.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.dinky.operations.CustomNewParserImpl;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.JSONGenerator;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.CachedPlan;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogDescriptor;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * CustomTableEnvironmentImpl
+ *
+ * @since 2022/05/08
+ */
+public class CustomTableEnvironmentImpl extends AbstractCustomTableEnvironment {
+
+ private static final Logger log = LoggerFactory.getLogger(CustomTableEnvironmentImpl.class);
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ public CustomTableEnvironmentImpl(StreamTableEnvironment streamTableEnvironment) {
+ super(streamTableEnvironment);
+ injectParser(new CustomNewParserImpl(this, getPlanner().getParser()));
+ }
+
+ public static CustomTableEnvironmentImpl create(
+ StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
+ return create(
+ executionEnvironment,
+ EnvironmentSettings.newInstance().withClassLoader(classLoader).build());
+ }
+
+ public static CustomTableEnvironmentImpl createBatch(
+ StreamExecutionEnvironment executionEnvironment, ClassLoader classLoader) {
+ return create(
+ executionEnvironment,
+ EnvironmentSettings.newInstance()
+ .withClassLoader(classLoader)
+ .inBatchMode()
+ .build());
+ }
+
+ public static CustomTableEnvironmentImpl create(
+ StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
+ StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
+
+ return new CustomTableEnvironmentImpl(streamTableEnvironment);
+ }
+
+ @Override
+ public ObjectNode getStreamGraph(String statement) {
+ List operations = super.getParser().parse(statement);
+ if (operations.size() != 1) {
+ throw new TableException("Unsupported SQL query! explainSql() only accepts a single SQL query.");
+ }
+
+ List modifyOperations = operations.stream()
+ .filter(ModifyOperation.class::isInstance)
+ .map(ModifyOperation.class::cast)
+ .collect(Collectors.toList());
+
+ StreamGraph streamGraph = transOperatoinsToStreamGraph(modifyOperations);
+ JSONGenerator jsonGenerator = new JSONGenerator(streamGraph);
+ try {
+ return (ObjectNode) mapper.readTree(jsonGenerator.getJSON());
+ } catch (JsonProcessingException e) {
+ log.error("read streamGraph configure error: ", e);
+ return mapper.createObjectNode();
+ }
+ }
+
+ private StreamGraph transOperatoinsToStreamGraph(List modifyOperations) {
+ List> trans = getPlanner().translate(modifyOperations);
+ final StreamExecutionEnvironment environment = getStreamExecutionEnvironment();
+ trans.forEach(environment::addOperator);
+
+ StreamGraph streamGraph = environment.getStreamGraph();
+ final Configuration configuration = getConfig().getConfiguration();
+ if (configuration.containsKey(PipelineOptions.NAME.key())) {
+ streamGraph.setJobName(configuration.getString(PipelineOptions.NAME));
+ }
+ return streamGraph;
+ }
+
+ @Override
+ public JobPlanInfo getJobPlanInfo(List statements) {
+ return new JobPlanInfo(JsonPlanGenerator.generatePlan(getJobGraphFromInserts(statements)));
+ }
+
+ @Override
+ public StreamGraph getStreamGraphFromInserts(List statements) {
+ List modifyOperations = new ArrayList<>();
+ statements.stream().map(statement -> getParser().parse(statement)).forEach(operations -> {
+ if (operations.size() != 1) {
+ throw new TableException("Only single statement is supported.");
+ }
+ Operation operation = operations.get(0);
+ if (operation instanceof ModifyOperation) {
+ modifyOperations.add((ModifyOperation) operation);
+ } else {
+ throw new TableException("Only insert statement is supported now.");
+ }
+ });
+
+ return transOperatoinsToStreamGraph(modifyOperations);
+ }
+
+ @Override
+ public void createCatalog(String catalogName, CatalogDescriptor catalogDescriptor) {
+ getCatalogManager().createCatalog(catalogName, catalogDescriptor);
+ }
+
+ @Override
+ public TableResultInternal executeCachedPlanInternal(CachedPlan cachedPlan) {
+
+ return null;
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableResultImpl.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableResultImpl.java
new file mode 100644
index 0000000000..3e1f1953d0
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/CustomTableResultImpl.java
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.CachedPlan;
+import org.apache.flink.table.api.internal.ResultProvider;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.table.utils.print.TableauStyle;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.annotation.Nullable;
+
+/** Implementation for {@link TableResult}. */
+@Internal
+public class CustomTableResultImpl implements TableResultInternal {
+
+ public static final TableResult TABLE_RESULT_OK = CustomTableResultImpl.builder()
+ .resultKind(ResultKind.SUCCESS)
+ .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
+ .data(Collections.singletonList(Row.of("OK")))
+ .build();
+
+ private final JobClient jobClient;
+ private final ResolvedSchema resolvedSchema;
+ private final ResultKind resultKind;
+ private final ResultProvider resultProvider;
+ private final PrintStyle printStyle;
+
+ private CustomTableResultImpl(
+ @Nullable JobClient jobClient,
+ ResolvedSchema resolvedSchema,
+ ResultKind resultKind,
+ ResultProvider resultProvider,
+ PrintStyle printStyle) {
+ this.jobClient = jobClient;
+ this.resolvedSchema = Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
+ this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind should not be null");
+ Preconditions.checkNotNull(resultProvider, "result provider should not be null");
+ this.resultProvider = resultProvider;
+ this.printStyle = Preconditions.checkNotNull(printStyle, "printStyle should not be null");
+ }
+
+ public static TableResult buildTableResult(List fields, List rows) {
+ Builder builder = builder().resultKind(ResultKind.SUCCESS);
+ if (fields.size() > 0) {
+ List columnNames = new ArrayList<>();
+ List columnTypes = new ArrayList<>();
+ for (int i = 0; i < fields.size(); i++) {
+ columnNames.add(fields.get(i).getName());
+ columnTypes.add(fields.get(i).getType());
+ }
+ builder.schema(ResolvedSchema.physical(columnNames, columnTypes)).data(rows);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public Optional getJobClient() {
+ return Optional.ofNullable(jobClient);
+ }
+
+ @Override
+ public void await() throws InterruptedException, ExecutionException {
+ try {
+ awaitInternal(-1, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // do nothing
+ }
+ }
+
+ @Override
+ public void await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ awaitInternal(timeout, unit);
+ }
+
+ private void awaitInternal(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (jobClient == null) {
+ return;
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(1, r -> new Thread(r, "TableResult-await-thread"));
+ try {
+ CompletableFuture future = CompletableFuture.runAsync(
+ () -> {
+ while (!resultProvider.isFirstRowReady()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new TableException("Thread is interrupted");
+ }
+ }
+ },
+ executor);
+
+ if (timeout >= 0) {
+ future.get(timeout, unit);
+ } else {
+ future.get();
+ }
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ @Override
+ public ResolvedSchema getResolvedSchema() {
+ return resolvedSchema;
+ }
+
+ @Override
+ public ResultKind getResultKind() {
+ return resultKind;
+ }
+
+ @Override
+ public CloseableIterator collect() {
+ return resultProvider.toExternalIterator();
+ }
+
+ @Override
+ public CloseableIterator collectInternal() {
+ return resultProvider.toInternalIterator();
+ }
+
+ @Override
+ public RowDataToStringConverter getRowDataToStringConverter() {
+ return resultProvider.getRowDataStringConverter();
+ }
+
+ @Nullable
+ public CachedPlan getCachedPlan() {
+ return null;
+ }
+
+ @Override
+ public void print() {
+ Iterator it = resultProvider.toInternalIterator();
+ printStyle.print(it, new PrintWriter(System.out));
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder for creating a {@link CustomTableResultImpl}. */
+ public static class Builder {
+
+ private JobClient jobClient = null;
+ private ResolvedSchema resolvedSchema = null;
+ private ResultKind resultKind = null;
+ private ResultProvider resultProvider = null;
+ private PrintStyle printStyle = null;
+
+ private Builder() {}
+
+ /**
+ * Specifies job client which associates the submitted Flink job.
+ *
+ * @param jobClient a {@link JobClient} for the submitted Flink job.
+ */
+ public Builder jobClient(JobClient jobClient) {
+ this.jobClient = jobClient;
+ return this;
+ }
+
+ /**
+ * Specifies schema of the execution result.
+ *
+ * @param resolvedSchema a {@link ResolvedSchema} for the execution result.
+ */
+ public Builder schema(ResolvedSchema resolvedSchema) {
+ Preconditions.checkNotNull(resolvedSchema, "resolvedSchema should not be null");
+ this.resolvedSchema = resolvedSchema;
+ return this;
+ }
+
+ /**
+ * Specifies result kind of the execution result.
+ *
+ * @param resultKind a {@link ResultKind} for the execution result.
+ */
+ public Builder resultKind(ResultKind resultKind) {
+ Preconditions.checkNotNull(resultKind, "resultKind should not be null");
+ this.resultKind = resultKind;
+ return this;
+ }
+
+ public Builder resultProvider(ResultProvider resultProvider) {
+ Preconditions.checkNotNull(resultProvider, "resultProvider should not be null");
+ this.resultProvider = resultProvider;
+ return this;
+ }
+
+ /**
+ * Specifies an row list as the execution result.
+ *
+ * @param rowList a row list as the execution result.
+ */
+ public Builder data(List rowList) {
+ Preconditions.checkNotNull(rowList, "listRows should not be null");
+ this.resultProvider = new StaticResultProvider(rowList);
+ return this;
+ }
+
+ /** Specifies print style. Default is {@link TableauStyle} with max integer column width. */
+ public Builder setPrintStyle(PrintStyle printStyle) {
+ Preconditions.checkNotNull(printStyle, "printStyle should not be null");
+ this.printStyle = printStyle;
+ return this;
+ }
+
+ /** Returns a {@link TableResult} instance. */
+ public TableResultInternal build() {
+ if (printStyle == null) {
+ printStyle = PrintStyle.rawContent(resultProvider.getRowDataStringConverter());
+ }
+ return new CustomTableResultImpl(jobClient, resolvedSchema, resultKind, resultProvider, printStyle);
+ }
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultStreamTableEnvironment.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultStreamTableEnvironment.java
new file mode 100644
index 0000000000..b7ef64ccba
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultStreamTableEnvironment.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamStatementSet;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.TableAggregateFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.types.Row;
+
+/** */
+public interface DefaultStreamTableEnvironment
+ extends StreamTableEnvironment, DefaultTableEnvironment, TableEnvironmentInstance {
+
+ default StreamTableEnvironment getStreamTableEnvironment() {
+ return (StreamTableEnvironment) getTableEnvironment();
+ }
+
+ @Override // region StreamTableEnvironment interface
+ default void registerFunction(String s, TableFunction tableFunction) {
+ getStreamTableEnvironment().registerFunction(s, tableFunction);
+ }
+
+ @Override
+ default void registerFunction(String s, AggregateFunction aggregateFunction) {
+ getStreamTableEnvironment().registerFunction(s, aggregateFunction);
+ }
+
+ @Override
+ default void registerFunction(String s, TableAggregateFunction tableAggregateFunction) {
+ getStreamTableEnvironment().registerFunction(s, tableAggregateFunction);
+ }
+
+ @Override
+ default Table fromDataStream(DataStream dataStream) {
+ return getStreamTableEnvironment().fromDataStream(dataStream);
+ }
+
+ @Override
+ default Table fromDataStream(DataStream dataStream, Schema schema) {
+ return getStreamTableEnvironment().fromDataStream(dataStream, schema);
+ }
+
+ @Override
+ default Table fromChangelogStream(DataStream dataStream) {
+ return getStreamTableEnvironment().fromChangelogStream(dataStream);
+ }
+
+ @Override
+ default Table fromChangelogStream(DataStream dataStream, Schema schema) {
+ return getStreamTableEnvironment().fromChangelogStream(dataStream, schema);
+ }
+
+ @Override
+ default Table fromChangelogStream(DataStream dataStream, Schema schema, ChangelogMode changelogMode) {
+ return getStreamTableEnvironment().fromChangelogStream(dataStream, schema, changelogMode);
+ }
+
+ @Override
+ default void createTemporaryView(String s, DataStream dataStream) {
+ getStreamTableEnvironment().createTemporaryView(s, dataStream);
+ }
+
+ @Override
+ default void createTemporaryView(String s, DataStream dataStream, Schema schema) {
+ getStreamTableEnvironment().createTemporaryView(s, dataStream, schema);
+ }
+
+ @Override
+ default DataStream toDataStream(Table table) {
+ return getStreamTableEnvironment().toDataStream(table);
+ }
+
+ @Override
+ default DataStream toDataStream(Table table, Class aClass) {
+ return getStreamTableEnvironment().toDataStream(table, aClass);
+ }
+
+ @Override
+ default DataStream toDataStream(Table table, AbstractDataType> abstractDataType) {
+ return getStreamTableEnvironment().toDataStream(table, abstractDataType);
+ }
+
+ @Override
+ default DataStream toChangelogStream(Table table) {
+ return getStreamTableEnvironment().toChangelogStream(table);
+ }
+
+ @Override
+ default DataStream toChangelogStream(Table table, Schema schema) {
+ return getStreamTableEnvironment().toChangelogStream(table, schema);
+ }
+
+ @Override
+ default DataStream toChangelogStream(Table table, Schema schema, ChangelogMode changelogMode) {
+ return getStreamTableEnvironment().toChangelogStream(table, schema, changelogMode);
+ }
+
+ @Override
+ default StreamStatementSet createStatementSet() {
+ return getStreamTableEnvironment().createStatementSet();
+ }
+
+ @Override
+ default Table fromDataStream(DataStream dataStream, Expression... expressions) {
+ return getStreamTableEnvironment().fromDataStream(dataStream, expressions);
+ }
+
+ @Override
+ default void registerDataStream(String s, DataStream dataStream) {
+ getStreamTableEnvironment().registerDataStream(s, dataStream);
+ }
+
+ @Override
+ default void createTemporaryView(String s, DataStream dataStream, Expression... expressions) {
+ getStreamTableEnvironment().createTemporaryView(s, dataStream, expressions);
+ }
+
+ @Override
+ default DataStream toAppendStream(Table table, Class aClass) {
+ return getStreamTableEnvironment().toAppendStream(table, aClass);
+ }
+
+ @Override
+ default DataStream toAppendStream(Table table, TypeInformation typeInformation) {
+ return getStreamTableEnvironment().toAppendStream(table, typeInformation);
+ }
+
+ @Override
+ default DataStream> toRetractStream(Table table, Class aClass) {
+ return getStreamTableEnvironment().toRetractStream(table, aClass);
+ }
+
+ @Override
+ default DataStream> toRetractStream(Table table, TypeInformation typeInformation) {
+ return getStreamTableEnvironment().toRetractStream(table, typeInformation);
+ }
+
+ // endregion
+
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultTableEnvironment.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultTableEnvironment.java
new file mode 100644
index 0000000000..dcfdc22b65
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultTableEnvironment.java
@@ -0,0 +1,342 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
+import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableDescriptor;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.module.Module;
+import org.apache.flink.table.module.ModuleEntry;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.table.types.AbstractDataType;
+
+import java.util.List;
+import java.util.Optional;
+
+/** */
+public interface DefaultTableEnvironment extends TableEnvironment, TableEnvironmentInstance {
+ @Override
+ default Table fromValues(Object... values) {
+ return TableEnvironment.super.fromValues(values);
+ }
+
+ @Override
+ default Table fromValues(AbstractDataType> rowType, Object... values) {
+ return TableEnvironment.super.fromValues(rowType, values);
+ }
+
+ @Override
+ default void createFunction(String path, String className, List resourceUris) {
+ getTableEnvironment().createFunction(path, className, resourceUris);
+ }
+
+ @Override
+ default void createFunction(String path, String className, List resourceUris, boolean ignoreIfExists) {
+ getTableEnvironment().createFunction(path, className, resourceUris, ignoreIfExists);
+ }
+
+ @Override
+ default void createTemporaryFunction(String path, String className, List resourceUris) {
+ getTableEnvironment().createTemporaryFunction(path, className, resourceUris);
+ }
+
+ @Override
+ default void createTemporarySystemFunction(String name, String className, List resourceUris) {
+ getTableEnvironment().createTemporarySystemFunction(name, className, resourceUris);
+ }
+
+ @Override
+ default String explainSql(String statement, ExplainFormat format, ExplainDetail... extraDetails) {
+ return getTableEnvironment().explainSql(statement, format, extraDetails);
+ }
+
+ @Override
+ default TableResult executePlan(PlanReference planReference) throws TableException {
+ return TableEnvironment.super.executePlan(planReference);
+ }
+
+ ///
+ @Override
+ default Table fromValues(Expression... expressions) {
+ return getTableEnvironment().fromValues(expressions);
+ }
+
+ @Override
+ default Table fromValues(AbstractDataType> abstractDataType, Expression... expressions) {
+ return getTableEnvironment().fromValues(abstractDataType, expressions);
+ }
+
+ @Override
+ default Table fromValues(Iterable> iterable) {
+ return getTableEnvironment().fromValues(iterable);
+ }
+
+ @Override
+ default Table fromValues(AbstractDataType> abstractDataType, Iterable> iterable) {
+ return getTableEnvironment().fromValues(abstractDataType, iterable);
+ }
+
+ @Override
+ default void registerCatalog(String s, Catalog catalog) {
+ getTableEnvironment().registerCatalog(s, catalog);
+ }
+
+ @Override
+ default Optional getCatalog(String s) {
+ return getTableEnvironment().getCatalog(s);
+ }
+
+ @Override
+ default void loadModule(String s, Module module) {
+ getTableEnvironment().loadModule(s, module);
+ }
+
+ @Override
+ default void useModules(String... strings) {
+ getTableEnvironment().useModules(strings);
+ }
+
+ @Override
+ default void unloadModule(String s) {
+ getTableEnvironment().unloadModule(s);
+ }
+
+ @Override
+ default void registerFunction(String s, ScalarFunction scalarFunction) {
+ getTableEnvironment().registerFunction(s, scalarFunction);
+ }
+
+ @Override
+ default void createTemporarySystemFunction(String s, Class extends UserDefinedFunction> aClass) {
+ getTableEnvironment().createTemporarySystemFunction(s, aClass);
+ }
+
+ @Override
+ default void createTemporarySystemFunction(String s, UserDefinedFunction userDefinedFunction) {
+ getTableEnvironment().createTemporarySystemFunction(s, userDefinedFunction);
+ }
+
+ @Override
+ default boolean dropTemporarySystemFunction(String s) {
+ return getTableEnvironment().dropTemporarySystemFunction(s);
+ }
+
+ @Override
+ default void createFunction(String s, Class extends UserDefinedFunction> aClass) {
+ getTableEnvironment().createFunction(s, aClass);
+ }
+
+ @Override
+ default void createFunction(String s, Class extends UserDefinedFunction> aClass, boolean b) {
+ getTableEnvironment().createFunction(s, aClass, b);
+ }
+
+ @Override
+ default boolean dropFunction(String s) {
+ return getTableEnvironment().dropFunction(s);
+ }
+
+ @Override
+ default void createTemporaryFunction(String s, Class extends UserDefinedFunction> aClass) {
+ getTableEnvironment().createTemporaryFunction(s, aClass);
+ }
+
+ @Override
+ default void createTemporaryFunction(String s, UserDefinedFunction userDefinedFunction) {
+ getTableEnvironment().createTemporaryFunction(s, userDefinedFunction);
+ }
+
+ @Override
+ default boolean dropTemporaryFunction(String s) {
+ return getTableEnvironment().dropTemporaryFunction(s);
+ }
+
+ @Override
+ default void createTemporaryTable(String s, TableDescriptor tableDescriptor) {
+ getTableEnvironment().createTemporaryTable(s, tableDescriptor);
+ }
+
+ @Override
+ default void createTable(String s, TableDescriptor tableDescriptor) {
+ getTableEnvironment().createTable(s, tableDescriptor);
+ }
+
+ @Override
+ default void registerTable(String s, Table table) {
+ getTableEnvironment().registerTable(s, table);
+ }
+
+ @Override
+ default void createTemporaryView(String s, Table table) {
+ getTableEnvironment().createTemporaryView(s, table);
+ }
+
+ @Override
+ default Table scan(String... strings) {
+ return getTableEnvironment().scan(strings);
+ }
+
+ @Override
+ default Table from(String s) {
+ return getTableEnvironment().from(s);
+ }
+
+ @Override
+ default Table from(TableDescriptor tableDescriptor) {
+ return getTableEnvironment().from(tableDescriptor);
+ }
+
+ @Override
+ default String[] listCatalogs() {
+ return getTableEnvironment().listCatalogs();
+ }
+
+ @Override
+ default String[] listModules() {
+ return getTableEnvironment().listModules();
+ }
+
+ @Override
+ default ModuleEntry[] listFullModules() {
+ return getTableEnvironment().listFullModules();
+ }
+
+ @Override
+ default String[] listDatabases() {
+ return getTableEnvironment().listDatabases();
+ }
+
+ @Override
+ default String[] listTables() {
+ return getTableEnvironment().listTables();
+ }
+
+ @Override
+ default String[] listTables(String s, String s1) {
+ return getTableEnvironment().listTables();
+ }
+
+ @Override
+ default String[] listViews() {
+ return getTableEnvironment().listViews();
+ }
+
+ @Override
+ default String[] listTemporaryTables() {
+ return getTableEnvironment().listTemporaryTables();
+ }
+
+ @Override
+ default String[] listTemporaryViews() {
+ return getTableEnvironment().listTemporaryViews();
+ }
+
+ @Override
+ default String[] listUserDefinedFunctions() {
+ return getTableEnvironment().listUserDefinedFunctions();
+ }
+
+ @Override
+ default String[] listFunctions() {
+ return getTableEnvironment().listFunctions();
+ }
+
+ @Override
+ default boolean dropTemporaryTable(String s) {
+ return getTableEnvironment().dropTemporaryTable(s);
+ }
+
+ @Override
+ default boolean dropTemporaryView(String s) {
+ return getTableEnvironment().dropTemporaryView(s);
+ }
+
+ @Override
+ default String explainSql(String s, ExplainDetail... explainDetails) {
+ return getTableEnvironment().explainSql(s);
+ }
+
+ @Override
+ default String[] getCompletionHints(String s, int i) {
+ return getTableEnvironment().getCompletionHints(s, i);
+ }
+
+ @Override
+ default Table sqlQuery(String s) {
+ return getTableEnvironment().sqlQuery(s);
+ }
+
+ @Override
+ default TableResult executeSql(String s) {
+ return getTableEnvironment().executeSql(s);
+ }
+
+ @Override
+ default String getCurrentCatalog() {
+ return getTableEnvironment().getCurrentCatalog();
+ }
+
+ @Override
+ default void useCatalog(String s) {
+ getTableEnvironment().useCatalog(s);
+ }
+
+ @Override
+ default String getCurrentDatabase() {
+ return getTableEnvironment().getCurrentDatabase();
+ }
+
+ @Override
+ default void useDatabase(String s) {
+ getTableEnvironment().useDatabase(s);
+ }
+
+ @Override
+ default TableConfig getConfig() {
+ return getTableEnvironment().getConfig();
+ }
+
+ @Override
+ default StatementSet createStatementSet() {
+ return getTableEnvironment().createStatementSet();
+ }
+
+ @Override
+ default CompiledPlan loadPlan(PlanReference planReference) throws TableException {
+ return getTableEnvironment().loadPlan(planReference);
+ }
+
+ @Override
+ default CompiledPlan compilePlanSql(String s) throws TableException {
+ return getTableEnvironment().compilePlanSql(s);
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultTableEnvironmentInternal.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultTableEnvironmentInternal.java
new file mode 100644
index 0000000000..72f1a3b9b7
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/DefaultTableEnvironmentInternal.java
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.table.api.CompiledPlan;
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.ExplainFormat;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.delegation.InternalPlan;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.utils.OperationTreeBuilder;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sources.TableSource;
+
+import java.util.List;
+
+/** */
+public interface DefaultTableEnvironmentInternal extends TableEnvironmentInternal, TableEnvironmentInstance {
+
+ default TableEnvironmentInternal getTableEnvironmentInternal() {
+ return (TableEnvironmentInternal) getTableEnvironment();
+ }
+
+ // region TableEnvironmentInternal interface
+ @Override
+ default Parser getParser() {
+ return getTableEnvironmentInternal().getParser();
+ }
+
+ @Override
+ default CatalogManager getCatalogManager() {
+ return getTableEnvironmentInternal().getCatalogManager();
+ }
+
+ @Override
+ default OperationTreeBuilder getOperationTreeBuilder() {
+ return getTableEnvironmentInternal().getOperationTreeBuilder();
+ }
+
+ @Override
+ default Table fromTableSource(TableSource> tableSource) {
+ return getTableEnvironmentInternal().fromTableSource(tableSource);
+ }
+
+ @Override
+ default TableResultInternal executeInternal(List list) {
+ return getTableEnvironmentInternal().executeInternal(list);
+ }
+
+ @Override
+ default TableResultInternal executeInternal(Operation operation) {
+ return getTableEnvironmentInternal().executeInternal(operation);
+ }
+
+ @Override
+ default String explainInternal(List list, ExplainDetail... explainDetails) {
+ return getTableEnvironmentInternal().explainInternal(list, explainDetails);
+ }
+
+ @Override
+ default void registerTableSourceInternal(String s, TableSource> tableSource) {
+ getTableEnvironmentInternal().registerTableSourceInternal(s, tableSource);
+ }
+
+ @Override
+ default void registerTableSinkInternal(String s, TableSink> tableSink) {
+ getTableEnvironmentInternal().registerTableSinkInternal(s, tableSink);
+ }
+
+ @Override
+ default CompiledPlan compilePlan(List list) {
+ return getTableEnvironmentInternal().compilePlan(list);
+ }
+
+ @Override
+ default TableResultInternal executePlan(InternalPlan internalPlan) {
+ return getTableEnvironmentInternal().executePlan(internalPlan);
+ }
+
+ @Override
+ default String explainPlan(InternalPlan internalPlan, ExplainDetail... explainDetails) {
+ return getTableEnvironmentInternal().explainPlan(internalPlan, explainDetails);
+ }
+
+ @Override
+ default String explainInternal(List operations, ExplainFormat format, ExplainDetail... extraDetails) {
+ return getTableEnvironmentInternal().explainInternal(operations, format, extraDetails);
+ }
+ // endregion
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ExtendedParser.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ExtendedParser.java
new file mode 100644
index 0000000000..42ffd70783
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ExtendedParser.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.table.delegation.Parser;
+
+/** */
+public interface ExtendedParser extends Parser {
+ CustomParser getCustomParser();
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ParserWrapper.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ParserWrapper.java
new file mode 100644
index 0000000000..922ebf62ea
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/ParserWrapper.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+public class ParserWrapper implements ExtendedParser {
+
+ private CustomParser customParser;
+
+ public ParserWrapper(CustomParser customParser) {
+ this.customParser = customParser;
+ }
+
+ @Override
+ public List parse(String statement) {
+ List result = customParser.parse(statement);
+ if (result != null) {
+ return result;
+ }
+
+ return customParser.getParser().parse(statement);
+ }
+
+ @Override
+ public UnresolvedIdentifier parseIdentifier(String identifier) {
+ return customParser.getParser().parseIdentifier(identifier);
+ }
+
+ @Override
+ public ResolvedExpression parseSqlExpression(
+ String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
+ return customParser.getParser().parseSqlExpression(sqlExpression, inputRowType, outputType);
+ }
+
+ @Override
+ public String[] getCompletionHints(String statement, int position) {
+ return customParser.getParser().getCompletionHints(statement, position);
+ }
+
+ @Override
+ public CustomParser getCustomParser() {
+ return customParser;
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/StaticResultProvider.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/StaticResultProvider.java
new file mode 100644
index 0000000000..bc88b875fa
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/StaticResultProvider.java
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.ResultProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.utils.print.PrintStyle;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.List;
+import java.util.function.Function;
+
+/** Create result provider from a static set of data using external types. */
+@Internal
+public class StaticResultProvider implements ResultProvider {
+
+ /**
+ * This converter supports only String, long, int and boolean fields. Moreover, this converter
+ * works only with {@link GenericRowData}.
+ */
+ public static final RowDataToStringConverter SIMPLE_ROW_DATA_TO_STRING_CONVERTER = rowData -> {
+ GenericRowData genericRowData = (GenericRowData) rowData;
+ String[] results = new String[rowData.getArity()];
+ for (int i = 0; i < results.length; i++) {
+ Object value = genericRowData.getField(i);
+ if (Boolean.TRUE.equals(value)) {
+ results[i] = "TRUE";
+ } else if (Boolean.FALSE.equals(value)) {
+ results[i] = "FALSE";
+ } else {
+ results[i] = value == null ? PrintStyle.NULL_VALUE : "" + value;
+ }
+ }
+ return results;
+ };
+
+ private final List rows;
+ private final Function externalToInternalConverter;
+
+ public StaticResultProvider(List rows) {
+ this(rows, StaticResultProvider::rowToInternalRow);
+ }
+
+ public StaticResultProvider(List rows, Function externalToInternalConverter) {
+ this.rows = rows;
+ this.externalToInternalConverter = externalToInternalConverter;
+ }
+
+ @Override
+ public StaticResultProvider setJobClient(JobClient jobClient) {
+ return this;
+ }
+
+ @Override
+ public CloseableIterator toInternalIterator() {
+ return CloseableIterator.adapterForIterator(
+ this.rows.stream().map(this.externalToInternalConverter).iterator());
+ }
+
+ @Override
+ public CloseableIterator toExternalIterator() {
+ return CloseableIterator.adapterForIterator(this.rows.iterator());
+ }
+
+ @Override
+ public RowDataToStringConverter getRowDataStringConverter() {
+ return SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
+ }
+
+ @Override
+ public boolean isFirstRowReady() {
+ return true;
+ }
+
+ /** This function supports only String, long, int and boolean fields. */
+ @VisibleForTesting
+ static RowData rowToInternalRow(Row row) {
+ Object[] values = new Object[row.getArity()];
+ for (int i = 0; i < row.getArity(); i++) {
+ Object value = row.getField(i);
+ if (value == null) {
+ values[i] = null;
+ } else if (value instanceof String) {
+ values[i] = StringData.fromString((String) value);
+ } else if (value instanceof Boolean || value instanceof Long || value instanceof Integer) {
+ values[i] = value;
+ } else {
+ throw new TableException("Cannot convert row type");
+ }
+ }
+
+ return GenericRowData.of(values);
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/TableSchemaField.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/TableSchemaField.java
new file mode 100644
index 0000000000..88dc81b567
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/executor/TableSchemaField.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.table.types.DataType;
+
+/** @since 2022/11/04 */
+public class TableSchemaField {
+
+ private String name;
+ private DataType type;
+
+ public TableSchemaField(String name, DataType type) {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public DataType getType() {
+ return type;
+ }
+
+ public void setType(DataType type) {
+ this.type = type;
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/CustomNewParserImpl.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/CustomNewParserImpl.java
new file mode 100644
index 0000000000..58b8099d2e
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/CustomNewParserImpl.java
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.operations;
+
+import org.dinky.parser.CustomParserImpl;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.planner.parse.ExtendedParser;
+
+public class CustomNewParserImpl extends CustomParserImpl {
+
+ private final DinkyParser dinkyParser;
+
+ public CustomNewParserImpl(TableEnvironment tableEnvironment, Parser parser) {
+ super(parser);
+ this.dinkyParser = new DinkyParser(tableEnvironment);
+ }
+
+ @Override
+ public ExtendedParser getDinkyParser() {
+ return this.dinkyParser;
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyExecutableOperation.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyExecutableOperation.java
new file mode 100644
index 0000000000..f30641265d
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyExecutableOperation.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.operations;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.ExecutableOperation;
+import org.apache.flink.table.operations.Operation;
+
+public class DinkyExecutableOperation implements ExecutableOperation {
+
+ private final Operation innerOperation;
+ private final TableEnvironment tableEnvironment;
+
+ public DinkyExecutableOperation(TableEnvironment tableEnvironment, Operation innerOperation) {
+ this.tableEnvironment = tableEnvironment;
+ this.innerOperation = innerOperation;
+ }
+
+ @Override
+ public TableResultInternal execute(Context ctx) {
+ DinkyOperationExecutor operationExecutor = new DinkyOperationExecutor(tableEnvironment, ctx);
+ return operationExecutor.executeOperation(innerOperation).get();
+ }
+
+ public Operation getInnerOperation() {
+ return innerOperation;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return innerOperation.asSummaryString();
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyOperationExecutor.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyOperationExecutor.java
new file mode 100644
index 0000000000..4892fea153
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyOperationExecutor.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.operations;
+
+import org.dinky.executor.CustomTableEnvironment;
+import org.dinky.trans.ExtendOperation;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.ExecutableOperation;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.Optional;
+
+public class DinkyOperationExecutor {
+ private final ExecutableOperation.Context context;
+
+ private final TableEnvironment tableEnvironment;
+
+ public DinkyOperationExecutor(TableEnvironment tableEnvironment, ExecutableOperation.Context context) {
+ this.tableEnvironment = tableEnvironment;
+ this.context = context;
+ }
+
+ public Optional executeOperation(Operation operation) {
+ ExtendOperation extendOperation = (ExtendOperation) operation;
+ return Optional.of((TableResultInternal) extendOperation
+ .execute((CustomTableEnvironment) tableEnvironment)
+ .get());
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyParser.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyParser.java
new file mode 100644
index 0000000000..94ec72e4ae
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/operations/DinkyParser.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.operations;
+
+import org.dinky.parser.DinkyExtendedParser;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.planner.parse.ExtendedParseStrategy;
+
+import java.util.Optional;
+
+public class DinkyParser extends DinkyExtendedParser {
+ private final TableEnvironment tableEnvironment;
+
+ public DinkyParser(TableEnvironment tableEnvironment) {
+ this.tableEnvironment = tableEnvironment;
+ }
+
+ @Override
+ public Optional parse(String statement) {
+ for (ExtendedParseStrategy strategy : PARSE_STRATEGIES) {
+ if (strategy.match(statement)) {
+ return Optional.of(new DinkyExecutableOperation(this.tableEnvironment, strategy.convert(statement)));
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/FlinkUtil.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/FlinkUtil.java
new file mode 100644
index 0000000000..3ba2ba4397
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/FlinkUtil.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * FlinkUtil
+ *
+ * @since 2022/05/08
+ */
+public class FlinkUtil {
+
+ public static List getFieldNamesFromCatalogManager(
+ CatalogManager catalogManager, String catalog, String database, String table) {
+ Optional tableOpt =
+ catalogManager.getTable(ObjectIdentifier.of(catalog, database, table));
+ if (tableOpt.isPresent()) {
+ return tableOpt.get().getResolvedSchema().getColumnNames();
+ } else {
+ return new ArrayList();
+ }
+ }
+
+ public static List catchColumn(TableResult tableResult) {
+ return tableResult.getResolvedSchema().getColumnNames();
+ }
+
+ public static String triggerSavepoint(ClusterClient clusterClient, String jobId, String savePoint)
+ throws ExecutionException, InterruptedException {
+ return clusterClient
+ .triggerSavepoint(JobID.fromHexString(jobId), savePoint, SavepointFormatType.DEFAULT)
+ .get()
+ .toString();
+ }
+
+ public static String stopWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint)
+ throws ExecutionException, InterruptedException {
+ return clusterClient
+ .stopWithSavepoint(JobID.fromHexString(jobId), true, savePoint, SavepointFormatType.DEFAULT)
+ .get()
+ .toString();
+ }
+
+ public static String cancelWithSavepoint(ClusterClient clusterClient, String jobId, String savePoint)
+ throws ExecutionException, InterruptedException {
+ return clusterClient
+ .cancelWithSavepoint(JobID.fromHexString(jobId), savePoint, SavepointFormatType.DEFAULT)
+ .get()
+ .toString();
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/FunctionVisitor.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/FunctionVisitor.java
new file mode 100644
index 0000000000..1f774297b7
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/FunctionVisitor.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.utils;
+
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FunctionVisitor extends SqlBasicVisitor {
+
+ private final List functionList = new ArrayList<>();
+
+ @Override
+ public Void visit(SqlCall call) {
+ if (call instanceof SqlBasicCall && call.getOperator() instanceof SqlFunction) {
+ SqlFunction function = (SqlFunction) call.getOperator();
+ SqlIdentifier opName = function.getNameAsId();
+
+ functionList.add(UnresolvedIdentifier.of(opName.names));
+ }
+ return super.visit(call);
+ }
+
+ public List getFunctionList() {
+ return functionList;
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/LineageContext.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/LineageContext.java
new file mode 100644
index 0000000000..c924b9c86d
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/LineageContext.java
@@ -0,0 +1,194 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.utils;
+
+import org.dinky.data.model.FunctionResult;
+import org.dinky.data.model.LineageRel;
+import org.dinky.executor.CustomParser;
+import org.dinky.executor.CustomTableEnvironment;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.metadata.RelColumnOrigin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.SinkModifyOperation;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * LineageContext
+ *
+ * @since 2022/11/22
+ */
+public class LineageContext {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LineageContext.class);
+
+ private final CustomTableEnvironment tableEnv;
+
+ public LineageContext(CustomTableEnvironment tableEnv) {
+ this.tableEnv = tableEnv;
+ }
+
+ public List analyzeLineage(String statement) {
+ // 1. Generate original relNode tree
+ Tuple2 parsed = parseStatement(statement);
+ String sinkTable = parsed.getField(0);
+ RelNode oriRelNode = parsed.getField(1);
+
+ // 2. Build lineage based from RelMetadataQuery
+ return buildFiledLineageResult(sinkTable, oriRelNode);
+ }
+
+ private Tuple2 parseStatement(String sql) {
+ List operations = tableEnv.getParser().parse(sql);
+
+ if (operations.size() != 1) {
+ throw new TableException("Unsupported SQL query! only accepts a single SQL statement.");
+ }
+ Operation operation = operations.get(0);
+ if (operation instanceof SinkModifyOperation) {
+ SinkModifyOperation sinkOperation = (SinkModifyOperation) operation;
+
+ PlannerQueryOperation queryOperation = (PlannerQueryOperation) sinkOperation.getChild();
+ RelNode relNode = queryOperation.getCalciteTree();
+ return new Tuple2<>(
+ sinkOperation.getContextResolvedTable().getIdentifier().asSummaryString(), relNode);
+ } else {
+ throw new TableException("Only insert is supported now.");
+ }
+ }
+
+ /** Check the size of query and sink fields match */
+ private void validateSchema(String sinkTable, RelNode relNode, List sinkFieldList) {
+ List queryFieldList = relNode.getRowType().getFieldNames();
+ if (queryFieldList.size() != sinkFieldList.size()) {
+ throw new ValidationException(String.format(
+ "Column types of query result and sink for %s do not match.\n"
+ + "Query schema: %s\n"
+ + "Sink schema: %s",
+ sinkTable, queryFieldList, sinkFieldList));
+ }
+ }
+
+ private List buildFiledLineageResult(String sinkTable, RelNode optRelNode) {
+ // target columns
+ List targetColumnList =
+ tableEnv.from(sinkTable).getResolvedSchema().getColumnNames();
+
+ // check the size of query and sink fields match
+ validateSchema(sinkTable, optRelNode, targetColumnList);
+
+ RelMetadataQuery metadataQuery = optRelNode.getCluster().getMetadataQuery();
+ List resultList = new ArrayList<>();
+
+ for (int index = 0; index < targetColumnList.size(); index++) {
+ String targetColumn = targetColumnList.get(index);
+
+ Set relColumnOriginSet = metadataQuery.getColumnOrigins(optRelNode, index);
+
+ if (CollectionUtils.isNotEmpty(relColumnOriginSet)) {
+ for (RelColumnOrigin relColumnOrigin : relColumnOriginSet) {
+ // table
+ RelOptTable table = relColumnOrigin.getOriginTable();
+ String sourceTable = String.join(".", table.getQualifiedName());
+
+ // filed
+ int ordinal = relColumnOrigin.getOriginColumnOrdinal();
+ List fieldNames = ((TableSourceTable) table)
+ .contextResolvedTable()
+ .getResolvedSchema()
+ .getColumnNames();
+ String sourceColumn = fieldNames.get(ordinal);
+
+ // add record
+ resultList.add(LineageRel.build(
+ sourceTable, sourceColumn, sinkTable, targetColumn, relColumnOrigin.getTransform()));
+ }
+ }
+ }
+ return resultList;
+ }
+
+ /**
+ * Analyze custom functions from SQL, does not contain system functions.
+ *
+ * @param singleSql the SQL statement to analyze
+ * @return custom functions set
+ */
+ public Set analyzeFunction(String singleSql) {
+ LOG.info("Analyze function Sql: \n {}", singleSql);
+ CustomParser parser = (CustomParser) tableEnv.getParser();
+
+ // parsing sql and return the abstract syntax tree
+ SqlNode sqlNode = parser.parseSql(singleSql);
+
+ // validate the query
+ SqlNode validated = parser.validate(sqlNode);
+
+ // look for all functions
+ FunctionVisitor visitor = new FunctionVisitor();
+ validated.accept(visitor);
+ List fullFunctionList = visitor.getFunctionList();
+
+ // filter custom functions
+ Set resultSet = new HashSet<>();
+ for (UnresolvedIdentifier unresolvedIdentifier : fullFunctionList) {
+ getFunctionCatalog()
+ .lookupFunction(unresolvedIdentifier)
+ .flatMap(ContextResolvedFunction::getIdentifier)
+ // the objectIdentifier of the built-in function is null
+ .flatMap(FunctionIdentifier::getIdentifier)
+ .ifPresent(identifier -> {
+ FunctionResult functionResult = new FunctionResult()
+ .setCatalogName(identifier.getCatalogName())
+ .setDatabase(identifier.getDatabaseName())
+ .setFunctionName(identifier.getObjectName());
+ LOG.debug("analyzed function: {}", functionResult);
+ resultSet.add(functionResult);
+ });
+ }
+ return resultSet;
+ }
+
+ private FunctionCatalog getFunctionCatalog() {
+ PlannerBase planner = (PlannerBase) tableEnv.getPlanner();
+ return planner.getFlinkContext().getFunctionCatalog();
+ }
+}
diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/ObjectConvertUtil.java b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/ObjectConvertUtil.java
new file mode 100644
index 0000000000..a8000ff74d
--- /dev/null
+++ b/dinky-client/dinky-client-1.19/src/main/java/org/dinky/utils/ObjectConvertUtil.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.utils;
+
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+
+import java.math.BigDecimal;
+import java.time.Instant;
+import java.time.ZoneId;
+
+import javax.xml.bind.DatatypeConverter;
+
+/** @className: org.dinky.utils.ObjectConvertUtil @Description: */
+public class ObjectConvertUtil {
+
+ public static Object convertValue(Object value, LogicalType logicalType) {
+ return ObjectConvertUtil.convertValue(value, logicalType, null);
+ }
+
+ public static Object convertValue(Object value, LogicalType logicalType, ZoneId sinkTimeZone) {
+ if (value == null) {
+ return null;
+ }
+ if (sinkTimeZone == null) {
+ sinkTimeZone = ZoneId.of("UTC");
+ }
+ if (logicalType instanceof DateType) {
+ if (value instanceof Integer) {
+ return Instant.ofEpochMilli(((Integer) value).longValue())
+ .atZone(sinkTimeZone)
+ .toLocalDate();
+ } else {
+ return Instant.ofEpochMilli((long) value)
+ .atZone(ZoneId.systemDefault())
+ .toLocalDate();
+ }
+ } else if (logicalType instanceof TimestampType) {
+ if (value instanceof Integer) {
+ return Instant.ofEpochMilli(((Integer) value).longValue())
+ .atZone(sinkTimeZone)
+ .toLocalDateTime();
+ } else if (value instanceof String) {
+ return Instant.parse((String) value)
+ .atZone(ZoneId.systemDefault())
+ .toLocalDateTime();
+ } else {
+ return Instant.ofEpochMilli((long) value).atZone(sinkTimeZone).toLocalDateTime();
+ }
+ } else if (logicalType instanceof DecimalType) {
+ return new BigDecimal(String.valueOf(value));
+ } else if (logicalType instanceof BigIntType) {
+ if (value instanceof Integer) {
+ return ((Integer) value).longValue();
+ } else {
+ return value;
+ }
+ } else if (logicalType instanceof VarBinaryType) {
+ // VARBINARY AND BINARY is converted to String with encoding base64 in FlinkCDC.
+ if (value instanceof String) {
+ return DatatypeConverter.parseBase64Binary((String) value);
+ } else {
+ return value;
+ }
+ } else {
+ return value;
+ }
+ }
+}
diff --git a/dinky-client/dinky-client-base/pom.xml b/dinky-client/dinky-client-base/pom.xml
index 17013ee533..82afcd8b00 100644
--- a/dinky-client/dinky-client-base/pom.xml
+++ b/dinky-client/dinky-client-base/pom.xml
@@ -159,6 +159,11 @@
dinky-flink-1.18
${scope.runtime}
+
+ org.dinky
+ dinky-flink-1.19
+ ${scope.runtime}
+
diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/ClusterDescriptorAdapter.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/ClusterDescriptorAdapter.java
new file mode 100644
index 0000000000..f372375b66
--- /dev/null
+++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/executor/ClusterDescriptorAdapter.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.dinky.executor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.KubernetesClusterDescriptor;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.yarn.YarnClusterDescriptor;
+
+import java.io.File;
+import java.util.List;
+
+public abstract class ClusterDescriptorAdapter {
+
+ protected YarnClusterDescriptor yarnClusterDescriptor;
+
+ public ClusterDescriptorAdapter() {}
+
+ public ClusterDescriptorAdapter(YarnClusterDescriptor yarnClusterDescriptor) {
+ this.yarnClusterDescriptor = yarnClusterDescriptor;
+ }
+
+ public abstract void addShipFiles(List shipFiles);
+
+ public abstract KubernetesClusterDescriptor createKubernetesClusterDescriptor(
+ Configuration configuration, FlinkKubeClient flinkKubeClient);
+}
diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/flink/checkpoint/base/BaseTypeCheckpointRead.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/flink/checkpoint/base/BaseTypeCheckpointRead.java
index cb8420f4b6..dc7effa28f 100644
--- a/dinky-client/dinky-client-base/src/main/java/org/dinky/flink/checkpoint/base/BaseTypeCheckpointRead.java
+++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/flink/checkpoint/base/BaseTypeCheckpointRead.java
@@ -22,6 +22,7 @@
import org.dinky.data.model.CheckPointReadTable;
import org.dinky.flink.checkpoint.BaseCheckpointRead;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.PartitionableListState;
@@ -56,7 +57,8 @@ private static TypeSerializer> getTypeSerializer(PartitionableListState> par
Map, BasicTypeInfo>> types = (Map, BasicTypeInfo>>)
ReflectUtil.getStaticFieldValue(ReflectUtil.getField(BasicTypeInfo.class, "TYPES"));
for (Map.Entry, BasicTypeInfo>> entry : types.entrySet()) {
- TypeSerializer> serializer = entry.getValue().createSerializer(null);
+ ExecutionConfig executionConfig = null;
+ TypeSerializer> serializer = entry.getValue().createSerializer(executionConfig);
boolean equals = getArrayListSerializer(partitionableListState)
.getElementSerializer()
.getClass()
diff --git a/dinky-client/pom.xml b/dinky-client/pom.xml
index b39dc89df1..a61b34f447 100644
--- a/dinky-client/pom.xml
+++ b/dinky-client/pom.xml
@@ -51,6 +51,7 @@
dinky-client-1.16
dinky-client-1.17
dinky-client-1.18
+ dinky-client-1.19
diff --git a/dinky-core/pom.xml b/dinky-core/pom.xml
index bba08a0054..6482b49437 100644
--- a/dinky-core/pom.xml
+++ b/dinky-core/pom.xml
@@ -342,6 +342,22 @@
dinky-flink-1.18
${scope.runtime}
+
+
+ org.dinky
+ dinky-client-1.19
+ ${scope.runtime}
+
+
+ org.dinky
+ dinky-catalog-mysql-1.19
+ ${scope.runtime}
+
+
+ org.dinky
+ dinky-flink-1.19
+ ${scope.runtime}
+
@@ -381,5 +397,14 @@
+
+ flink-1.19
+
+
+ org.dinky
+ dinky-cdc-plus
+
+
+
diff --git a/dinky-flink/dinky-flink-1.19/pom.xml b/dinky-flink/dinky-flink-1.19/pom.xml
new file mode 100644
index 0000000000..75d83b5387
--- /dev/null
+++ b/dinky-flink/dinky-flink-1.19/pom.xml
@@ -0,0 +1,158 @@
+
+
+ 4.0.0
+
+ org.dinky
+ dinky-flink
+ ${revision}
+ ../pom.xml
+
+ dinky-flink-1.19
+
+ jar
+
+ Dinky : Flink 1.19
+
+
+ 1.3.1
+ 17.0
+ 1.19.0
+ 3.0.1
+
+
+
+
+ org.apache.flink
+ flink-python
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-planner_2.12
+ ${flink.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.apache.flink
+ flink-connector-jdbc
+ 3.1.1-1.17
+
+
+ org.apache.flink
+ flink-statebackend-rocksdb
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-api-scala-bridge_2.12
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-yarn
+ ${flink.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.apache.flink
+ flink-kubernetes
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-connector-kafka
+ 3.0.1-1.18
+
+
+ org.apache.flink
+ flink-shaded-guava
+ 31.1-jre-${flink.shaded.version}
+
+
+ com.ververica
+ flink-sql-connector-mysql-cdc
+ ${flinkcdc.version}
+
+
+ com.ververica
+ flink-sql-connector-oracle-cdc
+ ${flinkcdc.version}
+
+
+ com.ververica
+ flink-sql-connector-sqlserver-cdc
+ ${flinkcdc.version}
+
+
+ com.ververica
+ flink-sql-connector-postgres-cdc
+ ${flinkcdc.version}
+
+
+ com.ververica
+ flink-cdc-cli
+ ${flinkcdc.version}
+
+
+ com.ververica
+ flink-cdc-pipeline-connector-mysql
+ ${flinkcdc.version}
+
+
+ com.ververica
+ flink-cdc-pipeline-connector-doris
+ ${flinkcdc.version}
+
+
+ com.ververica
+ flink-cdc-pipeline-connector-starrocks
+ ${flinkcdc.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ commons-cli
+ commons-cli
+ ${commons.version}
+
+
+ org.apache.doris
+ flink-doris-connector-1.18
+ 1.5.2
+
+
+ org.apache.flink
+ flink-runtime-web
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-state-processor-api
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-s3-fs-presto
+ ${flink.version}
+
+
+
+
diff --git a/dinky-flink/pom.xml b/dinky-flink/pom.xml
index 1bd966e157..0d49ae09b5 100644
--- a/dinky-flink/pom.xml
+++ b/dinky-flink/pom.xml
@@ -27,6 +27,9 @@
dinky-flink
pom
Dinky : Flink
+
+
+
@@ -43,6 +46,7 @@
dinky-flink-1.16
dinky-flink-1.17
dinky-flink-1.18
+ dinky-flink-1.19
diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java
index 49ec154ca2..74c6f4ec4a 100644
--- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java
+++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesApplicationGateway.java
@@ -23,6 +23,7 @@
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.enums.GatewayType;
import org.dinky.data.model.SystemConfiguration;
+import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.exception.GatewayException;
import org.dinky.gateway.kubernetes.utils.IgnoreNullRepresenter;
@@ -146,8 +147,9 @@ public ClusterClientProvider deployApplication(FlinkKubeClient client) t
// Deploy to k8s
ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(userJarParas, appConfig.getUserJarMainAppClass());
+ ClusterDescriptorAdapterImpl clusterDescriptorAdapter = new ClusterDescriptorAdapterImpl();
KubernetesClusterDescriptor kubernetesClusterDescriptor =
- new KubernetesClusterDescriptor(configuration, client);
+ clusterDescriptorAdapter.createKubernetesClusterDescriptor(configuration, client);
return kubernetesClusterDescriptor.deployApplicationCluster(
clusterSpecificationBuilder.createClusterSpecification(), applicationConfiguration);
}
diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesSessionGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesSessionGateway.java
index 162cf05609..04515d0554 100644
--- a/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesSessionGateway.java
+++ b/dinky-gateway/src/main/java/org/dinky/gateway/kubernetes/KubernetesSessionGateway.java
@@ -21,6 +21,7 @@
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.enums.GatewayType;
+import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.KubernetesResult;
@@ -54,8 +55,10 @@ public GatewayResult deployCluster(FlinkUdfPathContextHolder udfPathContextHolde
createClusterSpecificationBuilder();
KubernetesResult result = KubernetesResult.build(getType());
- try (KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(
- configuration, getK8sClientHelper().getClient())) {
+ ClusterDescriptorAdapterImpl clusterDescriptorAdapter = new ClusterDescriptorAdapterImpl();
+ try (KubernetesClusterDescriptor kubernetesClusterDescriptor =
+ clusterDescriptorAdapter.createKubernetesClusterDescriptor(
+ configuration, getK8sClientHelper().getClient())) {
ClusterClientProvider clusterClientProvider = kubernetesClusterDescriptor.deploySessionCluster(
clusterSpecificationBuilder.createClusterSpecification());
ClusterClient clusterClient = clusterClientProvider.getClusterClient();
diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java
index e9f2f45fa0..aae4b6092c 100644
--- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java
+++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnApplicationGateway.java
@@ -23,6 +23,7 @@
import org.dinky.constant.CustomerConfigureOptions;
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.enums.GatewayType;
+import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.config.AppConfig;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.YarnResult;
@@ -37,11 +38,10 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.stream.Collectors;
-import cn.hutool.core.collection.CollUtil;
-
/**
* YarnApplicationGateway
*
@@ -77,8 +77,9 @@ public GatewayResult submitJar(FlinkUdfPathContextHolder udfPathContextHolder) {
YarnResult result = YarnResult.build(getType());
String webUrl;
try (YarnClusterDescriptor yarnClusterDescriptor = createYarnClusterDescriptorWithJar(udfPathContextHolder)) {
-
- yarnClusterDescriptor.addShipFiles(CollUtil.newArrayList(preparSqlFile()));
+ ClusterDescriptorAdapterImpl clusterDescriptorAdapter =
+ new ClusterDescriptorAdapterImpl(yarnClusterDescriptor);
+ clusterDescriptorAdapter.addShipFiles(Arrays.asList(preparSqlFile()));
addConfigParas(
CustomerConfigureOptions.EXEC_SQL_FILE, configuration.get(CustomerConfigureOptions.EXEC_SQL_FILE));
diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
index 034f75f80d..05c51d9d22 100644
--- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
+++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnGateway.java
@@ -24,6 +24,7 @@
import org.dinky.context.FlinkUdfPathContextHolder;
import org.dinky.data.enums.JobStatus;
import org.dinky.data.model.SystemConfiguration;
+import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.AbstractGateway;
import org.dinky.gateway.config.ClusterConfig;
import org.dinky.gateway.config.FlinkConfig;
@@ -319,16 +320,16 @@ public void killCluster() {
protected YarnClusterDescriptor createYarnClusterDescriptorWithJar(FlinkUdfPathContextHolder udfPathContextHolder) {
YarnClusterDescriptor yarnClusterDescriptor = createInitYarnClusterDescriptor();
-
+ ClusterDescriptorAdapterImpl clusterDescriptorAdapter = new ClusterDescriptorAdapterImpl(yarnClusterDescriptor);
if (Asserts.isNotNull(config.getJarPaths())) {
- yarnClusterDescriptor.addShipFiles(
+ clusterDescriptorAdapter.addShipFiles(
Arrays.stream(config.getJarPaths()).map(FileUtil::file).collect(Collectors.toList()));
- yarnClusterDescriptor.addShipFiles(new ArrayList<>(udfPathContextHolder.getPyUdfFile()));
+ clusterDescriptorAdapter.addShipFiles(new ArrayList<>(udfPathContextHolder.getPyUdfFile()));
}
Set otherPluginsFiles = udfPathContextHolder.getAllFileSet();
if (CollUtil.isNotEmpty(otherPluginsFiles)) {
- yarnClusterDescriptor.addShipFiles(CollUtil.newArrayList(otherPluginsFiles));
+ clusterDescriptorAdapter.addShipFiles(new ArrayList<>(otherPluginsFiles));
}
return yarnClusterDescriptor;
}
diff --git a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java
index 6f6da7a393..c7d09d3a47 100644
--- a/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java
+++ b/dinky-gateway/src/main/java/org/dinky/gateway/yarn/YarnPerJobGateway.java
@@ -22,6 +22,7 @@
import org.dinky.assertion.Asserts;
import org.dinky.constant.CustomerConfigureOptions;
import org.dinky.data.enums.GatewayType;
+import org.dinky.executor.ClusterDescriptorAdapterImpl;
import org.dinky.gateway.result.GatewayResult;
import org.dinky.gateway.result.YarnResult;
@@ -35,7 +36,6 @@
import java.util.Arrays;
import java.util.stream.Collectors;
-import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.URLUtil;
@@ -68,7 +68,9 @@ public GatewayResult submitJobGraph(JobGraph jobGraph) {
YarnResult result = YarnResult.build(getType());
try (YarnClusterDescriptor yarnClusterDescriptor = createInitYarnClusterDescriptor()) {
- yarnClusterDescriptor.addShipFiles(CollUtil.newArrayList(preparSqlFile()));
+ ClusterDescriptorAdapterImpl clusterDescriptorAdapter =
+ new ClusterDescriptorAdapterImpl(yarnClusterDescriptor);
+ clusterDescriptorAdapter.addShipFiles(Arrays.asList(preparSqlFile()));
addConfigParas(
CustomerConfigureOptions.EXEC_SQL_FILE, configuration.get(CustomerConfigureOptions.EXEC_SQL_FILE));
ClusterClientProvider clusterClientProvider = yarnClusterDescriptor.deployJobCluster(
diff --git a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/contants.tsx b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/contants.tsx
index d338bba1f3..6d462a0314 100644
--- a/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/contants.tsx
+++ b/dinky-web/src/pages/RegCenter/Cluster/Configuration/components/ConfigurationModal/ConfigurationForm/FlinkK8s/contants.tsx
@@ -26,5 +26,6 @@ export const versionOptions = [
{ label: '1.15', value: 'v1_15' },
{ label: '1.16', value: 'v1_16' },
{ label: '1.17', value: 'v1_17' },
- { label: '1.18', value: 'v1_18' }
+ { label: '1.18', value: 'v1_18' },
+ { label: '1.19', value: 'v1_19' }
];
diff --git a/dinky-web/src/pages/RegCenter/Document/constans.ts b/dinky-web/src/pages/RegCenter/Document/constans.ts
index 29c3428e95..cc1ca00d3a 100644
--- a/dinky-web/src/pages/RegCenter/Document/constans.ts
+++ b/dinky-web/src/pages/RegCenter/Document/constans.ts
@@ -83,6 +83,10 @@ export const VERSIONS = [
text: 'Flink-1.18',
value: '1.18'
},
+ {
+ text: 'Flink-1.19',
+ value: '1.19'
+ },
{
text: 'All Versions',
value: 'All Versions'
diff --git a/docs/docs/developer_guide/local_debug.md b/docs/docs/developer_guide/local_debug.md
index 8591481815..ed63e64e59 100644
--- a/docs/docs/developer_guide/local_debug.md
+++ b/docs/docs/developer_guide/local_debug.md
@@ -109,6 +109,7 @@ Install/Package 过程中报错代码格式化问题,请参考 [代码格式化]
| flink-1.16 | 用于指定 Flink 版本为 1.16,只能单选,需要勾选 flink-single-version |
| flink-1.17 | 用于指定 Flink 版本为 1.17,只能单选,需要勾选 flink-single-version |
| flink-1.18 | 用于指定 Flink 版本为 1.18,只能单选,需要勾选 flink-single-version |
+| flink-1.19 | 用于指定 Flink 版本为 1.19,只能单选,需要勾选 flink-single-version |
| jdk 11 | 用于指定 JDK 版本为 11,前提是本地已经安装了 JDK 11,如果没有安装 jdk11,则默认使用本地的 jdk8 |
| mac | 用于适配在 mac 系统上进行调试 |
| maven-central | 用于指定 maven 仓库为中央仓库 |
diff --git a/pom.xml b/pom.xml
index 26c2535f39..a448c28154 100644
--- a/pom.xml
+++ b/pom.xml
@@ -425,6 +425,11 @@
dinky-client-1.18
${project.version}
+
+ org.dinky
+ dinky-client-1.19
+ ${project.version}
+
org.dinky
dinky-catalog-mysql-1.14
@@ -450,6 +455,11 @@
dinky-catalog-mysql-1.18
${project.version}
+
+ org.dinky
+ dinky-catalog-mysql-1.19
+ ${project.version}
+
org.dinky
dinky-connector-jdbc-1.14
@@ -606,6 +616,11 @@
dinky-flink-1.18
${project.version}
+
+ org.dinky
+ dinky-flink-1.19
+ ${project.version}
+
org.dinky
dinky-cdc-core
@@ -1057,6 +1072,13 @@
+
+ flink-1.19
+
+ 1.19
+
+
+
flink-single-version