Skip to content

Commit

Permalink
[spark] Supports parser of Spark call procedure command (apache#1785)
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas authored Aug 25, 2023
1 parent 2255898 commit 7a61984
Show file tree
Hide file tree
Showing 25 changed files with 1,542 additions and 19 deletions.
9 changes: 7 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,15 @@ paimon-common/src/main/java/org/apache/paimon/types/DataType.java
paimon-common/src/main/java/org/apache/paimon/options/ConfigOption.java
from http://flink.apache.org/ version 1.17.0

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
paimon-core/src/main/java/org/apache/paimon/utils/ZOrderByteUtils.java
paimon-core/src/test/java/org/apache/paimon/utils/TestZOrderByteUtil.java
paimon-hive/paimon-hive-common/src/test/java/org/apache/paimon/hive/TestHiveMetastore.java
paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/CoerceArguments.scala
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveProcedures.scala
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSparkSqlExtensionsParser.scala
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
from http://iceberg.apache.org/ version 1.3.0

paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql
Expand Down
18 changes: 18 additions & 0 deletions paimon-spark/paimon-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ under the License.
</dependencies>

<build>

<pluginManagement>
<plugins>
<plugin>
Expand Down Expand Up @@ -276,6 +277,23 @@ under the License.
</scala>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>${antlr4-maven-plugin.version}</version>
<executions>
<execution>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
<configuration>
<visitor>true</visitor>
<listener>true</listener>
<sourceDirectory>src/main/antlr4</sourceDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This file is an adaptation of Spark's grammar files.
*/

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. */

grammar PaimonSqlExtensions;

@lexer::members {
/**
* Verify whether current token is a valid decimal token (which contains dot).
* Returns true if the character that follows the token is not a digit or letter or underscore.
*
* For example:
* For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'.
* For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'.
* For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'.
* For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is followed
* by a space. 34.E2 is a valid decimal token because it is followed by symbol '+'
* which is not a digit or letter or underscore.
*/
public boolean isValidDecimal() {
int nextChar = _input.LA(1);
if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' ||
nextChar == '_') {
return false;
} else {
return true;
}
}

/**
* This method will be called when we see '/*' and try to match it as a bracketed comment.
* If the next character is '+', it should be parsed as hint later, and we cannot match
* it as a bracketed comment.
*
* Returns true if the next character is '+'.
*/
public boolean isHint() {
int nextChar = _input.LA(1);
if (nextChar == '+') {
return true;
} else {
return false;
}
}
}

singleStatement
: statement ';'* EOF
;

statement
: CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call
;

callArgument
: expression #positionalArgument
| identifier '=>' expression #namedArgument
;

expression
: constant
| stringMap
;

constant
: number #numericLiteral
| booleanValue #booleanLiteral
| STRING+ #stringLiteral
| identifier STRING #typeConstructor
;

stringMap
: MAP '(' constant (',' constant)* ')'
;

booleanValue
: TRUE | FALSE
;

number
: MINUS? EXPONENT_VALUE #exponentLiteral
| MINUS? DECIMAL_VALUE #decimalLiteral
| MINUS? INTEGER_VALUE #integerLiteral
| MINUS? BIGINT_LITERAL #bigIntLiteral
| MINUS? SMALLINT_LITERAL #smallIntLiteral
| MINUS? TINYINT_LITERAL #tinyIntLiteral
| MINUS? DOUBLE_LITERAL #doubleLiteral
| MINUS? FLOAT_LITERAL #floatLiteral
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;

multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;

identifier
: IDENTIFIER #unquotedIdentifier
| quotedIdentifier #quotedIdentifierAlternative
| nonReserved #unquotedIdentifier
;

quotedIdentifier
: BACKQUOTED_IDENTIFIER
;

nonReserved
: CALL
| TRUE | FALSE
| MAP
;

CALL: 'CALL';

TRUE: 'TRUE';
FALSE: 'FALSE';

MAP: 'MAP';

PLUS: '+';
MINUS: '-';

STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
;

BIGINT_LITERAL
: DIGIT+ 'L'
;

SMALLINT_LITERAL
: DIGIT+ 'S'
;

TINYINT_LITERAL
: DIGIT+ 'Y'
;

INTEGER_VALUE
: DIGIT+
;

EXPONENT_VALUE
: DIGIT+ EXPONENT
| DECIMAL_DIGITS EXPONENT {isValidDecimal()}?
;

DECIMAL_VALUE
: DECIMAL_DIGITS {isValidDecimal()}?
;

FLOAT_LITERAL
: DIGIT+ EXPONENT? 'F'
| DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}?
;

DOUBLE_LITERAL
: DIGIT+ EXPONENT? 'D'
| DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
;

BIGDECIMAL_LITERAL
: DIGIT+ EXPONENT? 'BD'
| DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
;

IDENTIFIER
: (LETTER | DIGIT | '_')+
;

BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;

fragment DECIMAL_DIGITS
: DIGIT+ '.' DIGIT*
| '.' DIGIT+
;

fragment EXPONENT
: 'E' [+-]? DIGIT+
;

fragment DIGIT
: [0-9]
;

fragment LETTER
: [A-Z]
;

SIMPLE_COMMENT
: '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
;

BRACKETED_COMMENT
: '/*' {!isHint()}? (BRACKETED_COMMENT|.)*? '*/' -> channel(HIDDEN)
;

WS
: [ \r\n\t]+ -> channel(HIDDEN)
;

// Catch-all for anything we can't recognize.
// We use this to be able to ignore and recover all the text
// when splitting statements with DelimiterLexer
UNRECOGNIZED
: .
;
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.analysis.NoSuchProcedureException;
import org.apache.paimon.spark.catalog.ProcedureCatalog;
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
Expand Down Expand Up @@ -58,7 +62,7 @@
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;

/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog implements TableCatalog, SupportsNamespaces {
public class SparkCatalog implements TableCatalog, ProcedureCatalog, SupportsNamespaces {

private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);

Expand Down Expand Up @@ -315,6 +319,17 @@ public boolean dropTable(Identifier ident) {
}
}

@Override
public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException {
if (isValidateNamespace(identifier.namespace())) {
ProcedureBuilder builder = SparkProcedures.newBuilder(name);
if (builder != null) {
return builder.withTableCatalog(this).build();
}
}
throw new NoSuchProcedureException(identifier);
}

private SchemaChange toSchemaChange(TableChange change) {
if (change instanceof TableChange.SetProperty) {
TableChange.SetProperty set = (TableChange.SetProperty) change;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark;

import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;

import org.apache.hadoop.shaded.com.google.common.collect.ImmutableMap;

import java.util.Locale;
import java.util.Map;
import java.util.function.Supplier;

/** The {@link Procedure}s including all the stored procedures. */
public class SparkProcedures {

private static final Map<String, Supplier<ProcedureBuilder>> BUILDERS = initProcedureBuilders();

private SparkProcedures() {}

public static ProcedureBuilder newBuilder(String name) {
Supplier<ProcedureBuilder> builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT));
return builderSupplier != null ? builderSupplier.get() : null;
}

private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
ImmutableMap.Builder<String, Supplier<ProcedureBuilder>> procedureBuilders =
ImmutableMap.builder();
return procedureBuilders.build();
}
}
Loading

0 comments on commit 7a61984

Please sign in to comment.