Skip to content

Commit

Permalink
[Feature-1930][dlink-client] Analyze custom functions from Flink SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Nov 6, 2023
1 parent ded5ae6 commit a787c46
Show file tree
Hide file tree
Showing 32 changed files with 1,012 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,21 @@ public interface Parser {
* @throws SqlParserException if an exception is thrown when parsing the statement
*/
SqlNode parseExpression(String sqlExpression);

/**
* Entry point for parsing SQL and return the abstract syntax tree
*
* @param statement the SQL statement to evaluate
* @return abstract syntax tree
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
*/
SqlNode parseSql(String statement);

/**
* validate the query
*
* @param sqlNode SqlNode to execute on
* @return validated sqlNode
*/
SqlNode validate(SqlNode sqlNode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,18 @@ public CatalogManager getCatalogManager() {
public SqlNode parseExpression(String sqlExpression) {
return calciteParserSupplier.get().parseExpression(sqlExpression);
}

@Override
public SqlNode parseSql(String statement) {
CalciteParser parser = calciteParserSupplier.get();

// parse the sql query
return parser.parse(statement);
}

@Override
public SqlNode validate(SqlNode sqlNode) {
FlinkPlannerImpl flinkPlanner = validatorSupplier.get();
return flinkPlanner.validate(sqlNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public <T> void createTemporaryView(String path, DataStream<T> dataStream, Strin
@Override
public List<LineageRel> getLineage(String statement) {
LineageContext lineageContext = new LineageContext((TableEnvironmentImpl) streamTableEnvironment);
return lineageContext.getLineage(statement);
return lineageContext.analyzeLineage(statement);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.utils;

import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.util.SqlBasicVisitor;
import org.apache.flink.table.catalog.UnresolvedIdentifier;

import java.util.ArrayList;
import java.util.List;

/**
* @description: FunctionVisitor
* @author: HamaWhite
*/
public class FunctionVisitor extends SqlBasicVisitor<Void> {

private final List<UnresolvedIdentifier> functionList = new ArrayList<>();

@Override
public Void visit(SqlCall call) {
if (call instanceof SqlBasicCall && call.getOperator() instanceof SqlFunction) {
SqlFunction function = (SqlFunction) call.getOperator();
SqlIdentifier opName = function.getNameAsId();

functionList.add(UnresolvedIdentifier.of(opName.names));
}
return super.visit(call);
}

public List<UnresolvedIdentifier> getFunctionList() {
return functionList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,52 @@

package org.dinky.utils;

import org.dinky.data.model.FunctionResult;
import org.dinky.data.model.LineageRel;

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelColumnOrigin;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* LineageContext
*
* @since 2022/8/6 11:06
*/
public class LineageContext {

private static final Logger LOG = LoggerFactory.getLogger(LineageContext.class);

private final TableEnvironmentImpl tableEnv;

public LineageContext(TableEnvironmentImpl tableEnv) {
this.tableEnv = tableEnv;
}

public List<LineageRel> getLineage(String statement) {
public List<LineageRel> analyzeLineage(String statement) {
// 1. Generate original relNode tree
Tuple2<String, RelNode> parsed = parseStatement(statement);
String sinkTable = parsed.getField(0);
Expand Down Expand Up @@ -130,4 +142,49 @@ private List<LineageRel> buildFiledLineageResult(String sinkTable, RelNode optRe
}
return resultList;
}

/**
* Analyze custom functions from SQL, does not contain system functions.
*
* @param singleSql the SQL statement to analyze
* @return custom functions set
*/
public Set<FunctionResult> analyzeFunction(String singleSql) {
LOG.info("Analyze function Sql: \n {}", singleSql);
ParserImpl parser = (ParserImpl) tableEnv.getParser();

// parsing sql and return the abstract syntax tree
SqlNode sqlNode = parser.parseSql(singleSql);

// validate the query
SqlNode validated = parser.validate(sqlNode);

// look for all functions
FunctionVisitor visitor = new FunctionVisitor();
validated.accept(visitor);
List<UnresolvedIdentifier> fullFunctionList = visitor.getFunctionList();

// filter custom functions
Set<FunctionResult> resultSet = new HashSet<>();
for (UnresolvedIdentifier unresolvedIdentifier : fullFunctionList) {
getFunctionCatalog()
.lookupFunction(unresolvedIdentifier)
// the objectIdentifier of the built-in function is null
.flatMap(e -> e.getFunctionIdentifier().getIdentifier())
.ifPresent(identifier -> {
FunctionResult functionResult = new FunctionResult()
.setCatalogName(identifier.getCatalogName())
.setDatabase(identifier.getDatabaseName())
.setFunctionName(identifier.getObjectName());
LOG.debug("analyzed function: {}", functionResult);
resultSet.add(functionResult);
});
}
return resultSet;
}

private FunctionCatalog getFunctionCatalog() {
PlannerBase planner = (PlannerBase) tableEnv.getPlanner();
return planner.getFlinkContext().getFunctionCatalog();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.junit.Assert.assertEquals;

import org.dinky.data.model.FunctionResult;
import org.dinky.data.model.LineageRel;

import org.apache.flink.configuration.Configuration;
Expand All @@ -30,8 +31,7 @@
import org.apache.flink.table.api.internal.TableEnvironmentImpl;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Set;

import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -43,7 +43,12 @@
*/
public class LineageContextTest {

private static final String CATALOG_NAME = "default_catalog";

private static final String DEFAULT_DATABASE = "default_database";

private static TableEnvironmentImpl tableEnv;

private static LineageContext context;

@BeforeClass
Expand Down Expand Up @@ -76,37 +81,47 @@ public void init() {
+ ") WITH ( "
+ " 'connector' = 'print' "
+ ")");
// Create custom function my_suffix_udf
tableEnv.executeSql("DROP FUNCTION IF EXISTS my_suffix_udf");
tableEnv.executeSql("CREATE FUNCTION IF NOT EXISTS my_suffix_udf " + "AS 'org.dinky.utils.MySuffixFunction'");
}

@Test
public void testGetLineage() {
List<LineageRel> actualList = context.getLineage("INSERT INTO TT select a||c A ,b||c B from ST");
public void testAnalyzeLineage() {
String sql = "INSERT INTO TT SELECT a||c A ,b||c B FROM ST";
String[][] expectedArray = {
{"ST", "a", "TT", "A", "||(a, c)"},
{"ST", "c", "TT", "A", "||(a, c)"},
{"ST", "b", "TT", "B", "||(b, c)"},
{"ST", "c", "TT", "B", "||(b, c)"}
};

List<LineageRel> expectedList = buildResult(expectedArray);
analyzeLineage(sql, expectedArray);
}

@Test
public void testAnalyzeLineageAndFunction() {
String sql = "INSERT INTO TT SELECT LOWER(a) , my_suffix_udf(b) FROM ST";

String[][] expectedArray = {
{"ST", "a", "TT", "A", "LOWER(a)"},
{"ST", "b", "TT", "B", "my_suffix_udf(b)"}
};

analyzeLineage(sql, expectedArray);

analyzeFunction(sql, new String[] {"my_suffix_udf"});
}

private void analyzeLineage(String sql, String[][] expectedArray) {
List<LineageRel> actualList = context.analyzeLineage(sql);
List<LineageRel> expectedList = LineageRel.build(CATALOG_NAME, DEFAULT_DATABASE, expectedArray);
assertEquals(expectedList, actualList);
}

private List<LineageRel> buildResult(String[][] expectedArray) {
return Stream.of(expectedArray)
.map(e -> {
String transform = e.length == 5 ? e[4] : null;
return new LineageRel(
"default_catalog",
"default_database",
e[0],
e[1],
"default_catalog",
"default_database",
e[2],
e[3],
transform);
})
.collect(Collectors.toList());
private void analyzeFunction(String sql, String[] expectedArray) {
Set<FunctionResult> actualSet = context.analyzeFunction(sql);
Set<FunctionResult> expectedSet = FunctionResult.build(CATALOG_NAME, DEFAULT_DATABASE, expectedArray);
assertEquals(expectedSet, actualSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.dinky.utils;

import org.apache.flink.table.functions.ScalarFunction;

/**
* @description: MySuffixFunction
* @author: HamaWhite
*/
public class MySuffixFunction extends ScalarFunction {

public String eval(String input) {
return input.concat("-HamaWhite");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,21 @@ public interface Parser {
* @throws SqlParserException if an exception is thrown when parsing the statement
*/
SqlNode parseExpression(String sqlExpression);

/**
* Entry point for parsing SQL and return the abstract syntax tree
*
* @param statement the SQL statement to evaluate
* @return abstract syntax tree
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
*/
SqlNode parseSql(String statement);

/**
* validate the query
*
* @param sqlNode SqlNode to execute on
* @return validated sqlNode
*/
SqlNode validate(SqlNode sqlNode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,18 @@ public CatalogManager getCatalogManager() {
public SqlNode parseExpression(String sqlExpression) {
return calciteParserSupplier.get().parseExpression(sqlExpression);
}

@Override
public SqlNode parseSql(String statement) {
CalciteParser parser = calciteParserSupplier.get();

// parse the sql query
return parser.parse(statement);
}

@Override
public SqlNode validate(SqlNode sqlNode) {
FlinkPlannerImpl flinkPlanner = validatorSupplier.get();
return flinkPlanner.validate(sqlNode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public <T> void createTemporaryView(String path, DataStream<T> dataStream, Strin
@Override
public List<LineageRel> getLineage(String statement) {
LineageContext lineageContext = new LineageContext((TableEnvironmentImpl) streamTableEnvironment);
return lineageContext.getLineage(statement);
return lineageContext.analyzeLineage(statement);
}

@Override
Expand Down
Loading

0 comments on commit a787c46

Please sign in to comment.