Skip to content

Commit

Permalink
first version
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 18, 2024
1 parent 9b281bc commit afe5ec4
Show file tree
Hide file tree
Showing 22 changed files with 895 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.catalog.SupportView
import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, DropPaimonView, ResolvedIdentifier, ShowPaimonViews}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, ResolvedNamespace, UnresolvedView}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}

case class RewritePaimonViewCommands(spark: SparkSession)
extends Rule[LogicalPlan]
with LookupCatalog {

protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {

case CreateViewStatement(
ResolvedIdent(resolved),
userSpecifiedColumns,
comment,
properties,
Some(originalText),
child,
allowExisting,
replace,
_) =>
CreatePaimonView(
child = resolved,
queryText = originalText,
query = CTESubstitution.apply(child),
columnAliases = userSpecifiedColumns.map(_._1),
columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace
)

case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
DropPaimonView(resolved, ifExists)

case ShowViews(_, pattern, output) if catalogManager.currentCatalog.isInstanceOf[SupportView] =>
ShowPaimonViews(
ResolvedNamespace(catalogManager.currentCatalog, catalogManager.currentNamespace),
pattern,
output)
}

private object ResolvedIdent {
def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved match {
case CatalogAndIdentifier(viewCatalog: SupportView, ident) =>
Some(ResolvedIdentifier(viewCatalog, ident))
case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView, ident), _, _, _) =>
Some(ResolvedIdentifier(viewCatalog, ident))
case _ =>
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PaimonViewTest extends PaimonViewTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.parser.extensions

import org.apache.paimon.spark.catalog.SupportView
import org.apache.paimon.spark.catalyst.plans.logical.{CreatePaimonView, DropPaimonView, ResolvedIdentifier, ShowPaimonViews}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, ResolvedNamespace, UnresolvedDBObjectName, UnresolvedView}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}

case class RewritePaimonViewCommands(spark: SparkSession)
extends Rule[LogicalPlan]
with LookupCatalog {

protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {

case CreateView(
ResolvedIdent(resolved),
userSpecifiedColumns,
comment,
properties,
Some(queryText),
query,
allowExisting,
replace) =>
CreatePaimonView(
child = resolved,
queryText = queryText,
query = CTESubstitution.apply(query),
columnAliases = userSpecifiedColumns.map(_._1),
columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace
)

case DropView(ResolvedIdent(resolved), ifExists: Boolean) =>
DropPaimonView(resolved, ifExists)

case ShowViews(_, pattern, output) if catalogManager.currentCatalog.isInstanceOf[SupportView] =>
ShowPaimonViews(
ResolvedNamespace(catalogManager.currentCatalog, catalogManager.currentNamespace),
pattern,
output)
}

private object ResolvedIdent {
def unapply(unresolved: Any): Option[ResolvedIdentifier] = unresolved match {
case UnresolvedDBObjectName(CatalogAndIdentifier(viewCatalog: SupportView, ident), _) =>
Some(ResolvedIdentifier(viewCatalog, ident))
case UnresolvedView(CatalogAndIdentifier(viewCatalog: SupportView, ident), _, _, _) =>
Some(ResolvedIdentifier(viewCatalog, ident))
case _ =>
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PaimonViewTest extends PaimonViewTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PaimonViewTest extends PaimonViewTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PaimonViewTest extends PaimonViewTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PaimonViewTest extends PaimonViewTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
import org.apache.paimon.spark.catalog.SupportFunction;
import org.apache.paimon.spark.catalog.SupportView;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.FormatTableOptions;

Expand Down Expand Up @@ -72,10 +73,12 @@
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog implements SupportFunction {
public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, SupportView {

private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);

Expand Down Expand Up @@ -126,10 +129,7 @@ public String[] defaultNamespace() {
@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
checkArgument(
isValidateNamespace(namespace),
"Namespace %s is not valid",
Arrays.toString(namespace));
checkNamespace(namespace);
try {
catalog.createDatabase(namespace[0], false, metadata);
} catch (Catalog.DatabaseAlreadyExistException e) {
Expand All @@ -152,9 +152,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep
if (namespace.length == 0) {
return listNamespaces();
}
if (!isValidateNamespace(namespace)) {
throw new NoSuchNamespaceException(namespace);
}
checkNamespace(namespace);
try {
catalog.getDatabase(namespace[0]);
return new String[0][];
Expand All @@ -166,10 +164,7 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
checkArgument(
isValidateNamespace(namespace),
"Namespace %s is not valid",
Arrays.toString(namespace));
checkNamespace(namespace);
String dataBaseName = namespace[0];
try {
return catalog.getDatabase(dataBaseName).options();
Expand Down Expand Up @@ -207,10 +202,7 @@ public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException
*/
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
checkArgument(
isValidateNamespace(namespace),
"Namespace %s is not valid",
Arrays.toString(namespace));
checkNamespace(namespace);
try {
catalog.dropDatabase(namespace[0], false, cascade);
return true;
Expand All @@ -224,10 +216,7 @@ public boolean dropNamespace(String[] namespace, boolean cascade)

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
checkArgument(
isValidateNamespace(namespace),
"Missing database in namespace: %s",
Arrays.toString(namespace));
checkNamespace(namespace);
try {
return catalog.listTables(namespace[0]).stream()
.map(table -> Identifier.of(namespace, table))
Expand All @@ -239,10 +228,7 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti

@Override
public void invalidateTable(Identifier ident) {
try {
catalog.invalidateTable(toIdentifier(ident));
} catch (NoSuchTableException ignored) {
}
catalog.invalidateTable(toIdentifier(ident));
}

@Override
Expand Down Expand Up @@ -347,7 +333,7 @@ public boolean dropTable(Identifier ident) {
try {
catalog.dropTable(toIdentifier(ident), false);
return true;
} catch (Catalog.TableNotExistException | NoSuchTableException e) {
} catch (Catalog.TableNotExistException e) {
return false;
}
}
Expand Down Expand Up @@ -454,10 +440,6 @@ private void validateAlterProperty(String alterKey) {
}
}

private boolean isValidateNamespace(String[] namespace) {
return namespace.length == 1;
}

@Override
public void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException {
Expand All @@ -472,15 +454,6 @@ public void renameTable(Identifier oldIdent, Identifier newIdent)

// --------------------- tools ------------------------------------------

protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident)
throws NoSuchTableException {
if (!isValidateNamespace(ident.namespace())) {
throw new NoSuchTableException(ident);
}

return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
}

protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException {
try {
Expand Down
Loading

0 comments on commit afe5ec4

Please sign in to comment.