Skip to content

Commit

Permalink
[spark] Introduce SparkGenericCatalog (apache#1403)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jun 20, 2023
1 parent a5570d8 commit 251e190
Show file tree
Hide file tree
Showing 10 changed files with 576 additions and 3 deletions.
3 changes: 3 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ paimon-common/src/main/java/org/apache/paimon/types/DataType.java
paimon-common/src/main/java/org/apache/paimon/options/ConfigOption.java
from http://flink.apache.org/ version 1.17.0

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
from http://iceberg.apache.org/ version 1.3.0

MIT License
-----------

Expand Down
3 changes: 3 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ Copyright 2014-2023 The Apache Software Foundation
Apache Hadoop
Copyright 2006 and onwards The Apache Software Foundation.

Apache Iceberg
Copyright 2017-2022 The Apache Software Foundation

Flink Connector for Apache Doris
Copyright 2018-2023 The Apache Software Foundation

Expand Down
47 changes: 46 additions & 1 deletion docs/content/engines/spark3.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ Alternatively, you can copy `paimon-spark-3.3-{{< version >}}.jar` under `spark/

**Step 2: Specify Paimon Catalog**

{{< tabs "Specify Paimon Catalog" >}}

{{< tab "Catalog" >}}

When starting `spark-sql`, use the following command to register Paimon’s Spark catalog with the name `paimon`. Table files of the warehouse is stored under `/tmp/paimon`.

```bash
Expand All @@ -103,15 +107,37 @@ After `spark-sql` command line has started, run the following SQL to create and

```sql
USE paimon;
CREATE DATABASE default;
USE default;
```

After switching to the catalog (`'USE paimon'`), Spark's existing tables will not be directly accessible, you
can use the `spark_catalog.${database_name}.${table_name}` to access Spark tables.

{{< /tab >}}

{{< tab "Generic Catalog" >}}

When starting `spark-sql`, use the following command to register Paimon’s Spark Generic catalog to replace Spark
default catalog `spark_catalog`. (default warehouse is Spark `spark.sql.warehouse.dir`)

```bash
spark-sql ... \
--conf spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog
```

Using `SparkGenericCatalog`, you can use Paimon tables in this Catalog or non-Paimon tables such as Spark's csv,
parquet, Hive tables, etc.

{{< /tab >}}

{{< /tabs >}}

**Step 3: Create a table and Write Some Records**

{{< tabs "Create Paimon Table" >}}

{{< tab "Catalog" >}}

```sql
create table my_table (
k int,
Expand All @@ -123,6 +149,25 @@ create table my_table (
INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');
```

{{< /tab >}}

{{< tab "Generic Catalog" >}}

```sql
create table my_table (
k int,
v string
) USING paimon tblproperties (
'primary-key' = 'k'
) ;

INSERT INTO my_table VALUES (1, 'Hi'), (2, 'Hello');
```

{{< /tab >}}

{{< /tabs >}}

**Step 4: Query Table with SQL**

```sql
Expand Down
7 changes: 7 additions & 0 deletions paimon-spark/paimon-spark-3.1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

/** Base tests for spark read. */
public class SparkGenericCatalogTest {

protected static SparkSession spark = null;

protected static Path warehousePath = null;

@BeforeAll
public static void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
warehousePath = new Path("file:" + tempDir.toString());
spark =
SparkSession.builder()
.config("spark.sql.warehouse.dir", warehousePath.toString())
.master("local[2]")
.getOrCreate();
spark.conf().set("spark.sql.catalog.spark_catalog", SparkGenericCatalog.class.getName());
}

@AfterAll
public static void stopMetastoreAndSpark() {
if (spark != null) {
spark.stop();
spark = null;
}
}

@Test
public void testPaimonTable() throws Exception {
spark.sql(
"CREATE TABLE PT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");
writeTable(
"PT",
GenericRow.of(1, 2, BinaryString.fromString("3")),
GenericRow.of(4, 5, BinaryString.fromString("6")));
assertThat(spark.sql("SELECT * FROM PT").collectAsList().stream().map(Object::toString))
.containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");

spark.sql("CREATE DATABASE my_db");
spark.sql(
"CREATE TABLE DB_PT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");
writeTable(
"DB_PT",
GenericRow.of(1, 2, BinaryString.fromString("3")),
GenericRow.of(4, 5, BinaryString.fromString("6")));
assertThat(spark.sql("SELECT * FROM DB_PT").collectAsList().stream().map(Object::toString))
.containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");

assertThat(spark.sql("SHOW NAMESPACES").collectAsList().stream().map(Object::toString))
.containsExactlyInAnyOrder("[default]", "[my_db]");
}

@Test
public void testCsvTable() {
spark.sql("CREATE TABLE CT (a INT, b INT, c STRING) USING csv");
spark.sql("INSERT INTO CT VALUES (1, 2, '3'), (4, 5, '6')").collectAsList();
List<Row> rows = spark.sql("SELECT * FROM CT").collectAsList();
assertThat(rows.stream().map(Object::toString))
.containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
}

private static void writeTable(String tableName, GenericRow... rows) throws Exception {
FileStoreTable fileStoreTable =
FileStoreTableFactory.create(
LocalFileIO.create(),
new Path(warehousePath, String.format("default.db/%s", tableName)));
BatchWriteBuilder writeBuilder = fileStoreTable.newBatchWriteBuilder();
BatchTableWrite writer = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit();
for (GenericRow row : rows) {
writer.write(row);
}
commit.commit(writer.prepareCommit());
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
Options.fromMap(options),
SparkSession.active().sessionState().newHadoopConf());
this.catalog = CatalogFactory.createCatalog(catalogContext);
try {
createNamespace(defaultNamespace(), new HashMap<>());
} catch (NamespaceAlreadyExistsException ignored) {
}
}

@Override
Expand Down
Loading

0 comments on commit 251e190

Please sign in to comment.