Skip to content

Commit

Permalink
branch action
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree committed Nov 9, 2024
1 parent e9d45a5 commit c397acd
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

/**
* Dummy placeholder to resolve compatibility issue of CatalogMaterializedTable(introduced in flink
* 1.20).
*/
public interface ViewCatalog {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
import org.apache.paimon.spark.catalog.SupportFunction;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.view.ViewImpl;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
Expand Down Expand Up @@ -316,6 +317,7 @@ public org.apache.spark.sql.connector.catalog.Table createTable(
Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
try {
System.out.println("111111");
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if ((!usePaimon(provider))
&& SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
Expand Down Expand Up @@ -567,13 +569,6 @@ protected List<String> convertPartitionTransforms(Transform[] transforms) {
return partitionColNames;
}

// --------------------- unsupported methods ----------------------------

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes) {
throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
}

@Override
public Identifier[] listViews(String... namespace) throws NoSuchNamespaceException {
checkArgument(
Expand All @@ -591,26 +586,79 @@ public Identifier[] listViews(String... namespace) throws NoSuchNamespaceExcepti

@Override
public View loadView(Identifier identifier) throws NoSuchViewException {
return null;
try {
org.apache.paimon.view.View view = catalog.getView(toIdentifier(identifier));
return new SparkView(catalogName, view);
} catch (Catalog.ViewNotExistException | NoSuchTableException e) {
throw new RuntimeException(e);
}
}

@Override
public View createView(Identifier identifier, String s, String s1, String[] strings, StructType structType, String[] strings1, String[] strings2, String[] strings3, Map<String, String> map) throws ViewAlreadyExistsException, NoSuchNamespaceException {
return null;
public View createView(
Identifier identifier,
String sql,
String currentCatalog,
String[] currentNamespace,
StructType schema,
String[] queryColumnNames,
String[] columnAliases,
String[] columnComments,
Map<String, String> properties)
throws ViewAlreadyExistsException, NoSuchNamespaceException {
System.out.println("asdxxx");
try {
org.apache.paimon.types.RowType.Builder builder =
org.apache.paimon.types.RowType.builder();
Arrays.stream(schema.fields())
.forEach(
field ->
builder.field(
field.name(),
toPaimonType(field.dataType()).copy(field.nullable()),
field.getComment().getOrElse(() -> null)));
org.apache.paimon.view.View view =
new ViewImpl(
toIdentifier(identifier),
builder.build(),
sql,
properties.getOrDefault(TableCatalog.PROP_COMMENT, null),
properties);
catalog.createView(toIdentifier(identifier), view, false);
return loadView(identifier);
} catch (Catalog.ViewAlreadyExistException e) {
throw new ViewAlreadyExistsException(identifier);
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
} catch (Catalog.DatabaseNotExistException | NoSuchViewException e) {
throw new NoSuchNamespaceException(identifier.namespace());
}
}

@Override
public View alterView(Identifier identifier, ViewChange... viewChanges) throws NoSuchViewException, IllegalArgumentException {
return null;
public boolean dropView(Identifier identifier) {
try {
catalog.dropView(toIdentifier(identifier), false);
return true;
} catch (Catalog.ViewNotExistException | NoSuchTableException e) {
return false;
}
}

// --------------------- unsupported methods ----------------------------

@Override
public boolean dropView(Identifier identifier) {
return false;
public void alterNamespace(String[] namespace, NamespaceChange... changes) {
throw new UnsupportedOperationException("Alter namespace in Spark is not supported yet.");
}

@Override
public void renameView(Identifier identifier, Identifier identifier1) throws NoSuchViewException, ViewAlreadyExistsException {

public View alterView(Identifier identifier, ViewChange... viewChanges)
throws NoSuchViewException, IllegalArgumentException {
throw new UnsupportedOperationException("Alter view in Spark is not supported yet.");
}

@Override
public void renameView(Identifier oldIdent, Identifier newIdentq)
throws NoSuchViewException, ViewAlreadyExistsException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark;

import org.apache.paimon.types.DataField;

import org.apache.spark.sql.connector.catalog.View;
import org.apache.spark.sql.types.StructType;

import java.util.Map;

/** Spark {@link View} for paimon. */
public class SparkView implements View {

public static final String QUERY_COLUMN_NAMES = "spark.query-column-names";

private final String catalogName;

private final org.apache.paimon.view.View paimonView;

public SparkView(String catalogName, org.apache.paimon.view.View paimonView) {
this.paimonView = paimonView;
this.catalogName = catalogName;
}

@Override
public String name() {
return paimonView.fullName();
}

@Override
public String query() {
return paimonView.query();
}

@Override
public String currentCatalog() {
return catalogName;
}

@Override
public String[] currentNamespace() {
return new String[] {paimonView.fullName().split("\\.")[0]};
}

@Override
public StructType schema() {
return SparkTypeUtils.fromPaimonRowType(paimonView.rowType());
}

@Override
public String[] queryColumnNames() {
return paimonView.options().containsKey(QUERY_COLUMN_NAMES)
? paimonView.options().get(QUERY_COLUMN_NAMES).split(",")
: new String[0];
}

@Override
public String[] columnAliases() {
return paimonView.rowType().getFields().stream()
.map(DataField::name)
.toArray(String[]::new);
}

@Override
public String[] columnComments() {
return paimonView.rowType().getFields().stream()
.map(DataField::description)
.toArray(String[]::new);
}

@Override
public Map<String, String> properties() {
return paimonView.options();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@ public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
"CREATE TABLE IF NOT EXISTS t1 (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");

assertThat(spark.sql("SHOW NAMESPACES").collectAsList().stream().map(Object::toString))
.containsExactlyInAnyOrder("[default]", "[my_db]", "[my_db1]");
// assertThat(spark.sql("SHOW NAMESPACES").collectAsList().stream().map(Object::toString))
// .containsExactlyInAnyOrder("[default]", "[my_db]", "[my_db1]");

assertThat(
spark.sql("SHOW TABLES").collectAsList().stream()
.map(s -> s.get(1))
.map(Object::toString))
.containsExactlyInAnyOrder("t1");
spark.sql("create view p_view as select * from t1");
spark.close();

// secondly, we close catalog with hive metastore, and start a filesystem metastore to check
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.apache.paimon.spark;

import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import static org.assertj.core.api.Assertions.assertThat;

public class SparkViewTest {
private static TestHiveMetastore testHiveMetastore;

private static final int PORT = 9087;

@BeforeAll
public static void startMetastore() {
testHiveMetastore = new TestHiveMetastore();
testHiveMetastore.start(PORT);
}

@AfterAll
public static void closeMetastore() throws Exception {
testHiveMetastore.stop();
}

// @Test
// public void testView(@TempDir java.nio.file.Path tempDir) {
// // firstly, we use hive metastore to create table, and check the result.
// Path warehousePath = new Path("file:" + tempDir.toString());
// SparkSession spark =
// SparkSession.builder()
// .config("spark.sql.warehouse.dir", warehousePath.toString())
// // with case-sensitive false
// .config("spark.sql.caseSensitive", "false")
// // with hive metastore
// .config("spark.sql.catalogImplementation", "hive")
// .config(
// "spark.sql.catalog.spark_catalog",
// SparkCatalog.class.getName())
// .master("local[2]")
// .getOrCreate();
//
// spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
// spark.sql("USE my_db1");
// spark.sql(
// "CREATE TABLE IF NOT EXISTS t2 (a INT, Bb INT, c STRING) USING paimon TBLPROPERTIES"
// + " ('file.format'='avro')");
//
// spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
// spark.sql("USE spark_catalog.my_db1");
// spark.sql(
// "CREATE TABLE db_pt (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
// + " ('file.format'='avro')");
// spark.sql("INSERT INTO db_pt VALUES (1, 2, '3'), (4, 5, '6')");
// spark.sql("CREATE VIEW spark_view as select a , b from db_pt");
// spark.sql("show views").collectAsList().forEach(System.out::println);
// spark.sql("select * from spark_view").collectAsList().forEach(System.out::println);
//
//
// spark.close();
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
Assertions.assertEquals(
getTableLocation("paimon_db.paimon_tbl"),
s"${dBLocation.getCanonicalPath}/paimon_tbl")

val fileStoreTable = getPaimonScan("SELECT * FROM paimon_db.paimon_tbl").table
.asInstanceOf[FileStoreTable]
Assertions.assertEquals("paimon_tbl", fileStoreTable.name())
Expand Down

0 comments on commit c397acd

Please sign in to comment.