Skip to content

Commit

Permalink
[hive][spark] Support creating external table without schema when the…
Browse files Browse the repository at this point in the history
… table already exists (#4638)
  • Loading branch information
Zouxxyy authored Dec 10, 2024
1 parent 3691419 commit c6387a6
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 76 deletions.
29 changes: 28 additions & 1 deletion docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,33 @@ CREATE TABLE my_table (
);
```

### Create External Table

When the catalog's `metastore` type is `hive`, if the `location` is specified when creating a table, that table will be considered an external table; otherwise, it will be a managed table.

When you drop an external table, only the metadata in Hive will be removed, and the actual data files will not be deleted; whereas dropping a managed table will also delete the data.

```sql
CREATE TABLE my_table (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (
'primary-key' = 'dt,hh,user_id'
) LOCATION '/path/to/table';
```

Furthermore, if there is already data stored in the specified location, you can create the table without explicitly specifying the fields, partitions and props or other information.
In this case, the new table will inherit them all from the existing table’s metadata.

However, if you manually specify them, you need to ensure that they are consistent with those of the existing table (props can be a subset). Therefore, it is strongly recommended not to specify them.

```sql
CREATE TABLE my_table LOCATION '/path/to/table';
```

### Create Table As Select

Table can be created and populated by the results of a query, for example, we have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
Expand Down Expand Up @@ -241,7 +268,7 @@ DROP VIEW v1;
```

## Tag
### Create or Replace Tag
### Create Or Replace Tag
Create or replace a tag syntax with the following options.
- Create a tag with or without the snapshot id and time retention.
- Create an existed tag is not failed if using `IF NOT EXISTS` syntax.
Expand Down
31 changes: 20 additions & 11 deletions docs/content/spark/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,17 @@ TRUNCATE TABLE my_table;

## Update Table

spark supports update PrimitiveType and StructType, for example:
Updates the column values for the rows that match a predicate. When no predicate is provided, update the column values for all rows.

Note:

{{< hint info >}}

Update primary key columns is not supported when the target table is a primary key table.

{{< /hint >}}

Spark supports update PrimitiveType and StructType, for example:

```sql
-- Syntax
Expand All @@ -142,25 +152,29 @@ UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1;

## Delete From Table

Deletes the rows that match a predicate. When no predicate is provided, deletes all rows.

```sql
DELETE FROM my_table WHERE currency = 'UNKNOWN';
```

## Merge Into Table

Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit.
Merges a set of updates, insertions and deletions based on a source table into a target table.

Note:

{{< hint info >}}

In update clause, to update primary key columns is not supported when the target table is a primary key table.

{{< hint into >}}
1. In update clause, to update primary key columns is not supported.
2. `WHEN NOT MATCHED BY SOURCE` syntax is not supported.
{{< /hint >}}

**Example: One**

This is a simple demo that, if a row exists in the target table update it, else insert it.

```sql

-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
Expand All @@ -170,15 +184,13 @@ WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

```

**Example: Two**

This is a demo with multiple, conditional clauses.

```sql

-- Here both source and target tables have the same schema: (a INT, b INT, c STRING), and a is a primary key.

MERGE INTO target
Expand All @@ -194,15 +206,12 @@ WHEN NOT MATCHED AND c > 'c9' THEN
INSERT (a, b, c) VALUES (a, b * 1.1, c) -- when not matched but meet the condition 3, then transform and insert this row;
WHEN NOT MATCHED THEN
INSERT * -- when not matched, insert this row without any transformation;

```

## Streaming Write

{{< hint info >}}

Paimon currently supports Spark 3+ for streaming write.

Paimon Structured Streaming only supports the two `append` and `complete` modes.

{{< /hint >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,24 +208,18 @@ public TableSchema createTable(Schema schema) throws Exception {
return createTable(schema, false);
}

public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws Exception {
public TableSchema createTable(Schema schema, boolean externalTable) throws Exception {
while (true) {
Optional<TableSchema> latest = latest();
if (latest.isPresent()) {
TableSchema oldSchema = latest.get();
boolean isSame =
Objects.equals(oldSchema.fields(), schema.fields())
&& Objects.equals(oldSchema.partitionKeys(), schema.partitionKeys())
&& Objects.equals(oldSchema.primaryKeys(), schema.primaryKeys())
&& Objects.equals(oldSchema.options(), schema.options());
if (ignoreIfExistsSame && isSame) {
return oldSchema;
TableSchema latestSchema = latest.get();
if (externalTable) {
checkSchemaForExternalTable(latestSchema, schema);
return latestSchema;
} else {
throw new IllegalStateException(
"Schema in filesystem exists, creation is not allowed.");
}

throw new IllegalStateException(
"Schema in filesystem exists, please use updating,"
+ " latest schema is: "
+ oldSchema);
}

List<DataField> fields = schema.fields();
Expand Down Expand Up @@ -254,6 +248,38 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws
}
}

private void checkSchemaForExternalTable(TableSchema existsSchema, Schema newSchema) {
// When creating an external table, if the table already exists in the location, we can
// choose not to specify the fields.
if (newSchema.fields().isEmpty()
// When the fields are explicitly specified, we need check for consistency.
|| (Objects.equals(existsSchema.fields(), newSchema.fields())
&& Objects.equals(existsSchema.partitionKeys(), newSchema.partitionKeys())
&& Objects.equals(existsSchema.primaryKeys(), newSchema.primaryKeys()))) {
// check for options
Map<String, String> existsOptions = existsSchema.options();
Map<String, String> newOptions = newSchema.options();
newOptions.forEach(
(key, value) -> {
if (!key.equals(Catalog.OWNER_PROP)
&& (!existsOptions.containsKey(key)
|| !existsOptions.get(key).equals(value))) {
throw new RuntimeException(
"New schema's options are not equal to the exists schema's, new schema: "
+ newOptions
+ ", exists schema: "
+ existsOptions);
}
});
} else {
throw new RuntimeException(
"New schema is not equal to exists schema, new schema: "
+ newSchema
+ ", exists schema: "
+ existsSchema);
}
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(SchemaChange... changes) throws Exception {
return commitChanges(Arrays.asList(changes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,11 +720,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try {
tableSchema = schemaManager(identifier, location).createTable(schema, externalTable);
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
+ identifier.getFullName()
+ " to underlying files.",
e);
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
}

try {
Expand All @@ -735,7 +731,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
identifier, tableSchema, location, externalTable)));
} catch (Exception e) {
try {
fileIO.deleteDirectoryQuietly(location);
if (!externalTable) {
fileIO.deleteDirectoryQuietly(location);
}
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", location, identifier, ee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, SupportView {
Expand Down Expand Up @@ -298,26 +297,8 @@ public org.apache.spark.sql.connector.catalog.Table createTable(
Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
try {
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if ((!usePaimon(provider))
&& SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
Map<String, String> newProperties = new HashMap<>(properties);
newProperties.put(TYPE.key(), FORMAT_TABLE.toString());
newProperties.put(FILE_FORMAT.key(), provider.toLowerCase());
catalog.createTable(
toIdentifier(ident),
toInitialSchema(schema, partitions, newProperties),
false);
} else {
checkArgument(
usePaimon(provider),
"SparkCatalog can only create paimon table, but current provider is %s",
provider);
catalog.createTable(
toIdentifier(ident),
toInitialSchema(schema, partitions, properties),
false);
}
catalog.createTable(
toIdentifier(ident), toInitialSchema(schema, partitions, properties), false);
return loadTable(ident);
} catch (Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistsException(ident);
Expand Down Expand Up @@ -406,9 +387,12 @@ private static SchemaChange.Move getMove(
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map<String, String> properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
if (!normalizedProperties.containsKey(TableCatalog.PROP_PROVIDER)) {
normalizedProperties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME());
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if (!usePaimon(provider) && SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
normalizedProperties.put(TYPE.key(), FORMAT_TABLE.toString());
normalizedProperties.put(FILE_FORMAT.key(), provider.toLowerCase());
}
normalizedProperties.remove(TableCatalog.PROP_PROVIDER);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object SparkSource {

val NAME = "paimon"

val FORMAT_NAMES = Seq("csv", "orc", "parquet")
val FORMAT_NAMES: Seq[String] = Seq("csv", "orc", "parquet")

def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): BaseRelation = {
new BaseRelation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
test("Paimon DDL: create table without using paimon") {
withTable("paimon_tbl") {
sql("CREATE TABLE paimon_tbl (id int)")
assert(loadTable("paimon_tbl").options().get("provider").equals("paimon"))
assert(!loadTable("paimon_tbl").options().containsKey("provider"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,7 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
spark.sql(
s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '$expertTbLocation'")
checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1))
assert(
loadTable("paimon_db", "external_tbl")
.location()
.toString
.split(':')
.apply(1)
.equals(expertTbLocation))
assert(getActualTableLocation("paimon_db", "external_tbl").equals(expertTbLocation))

// create managed table
spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
Expand Down Expand Up @@ -373,12 +367,8 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
spark.sql("ALTER TABLE external_tbl RENAME TO external_tbl_renamed")
checkAnswer(spark.sql("SELECT * FROM external_tbl_renamed"), Row(1))
assert(
loadTable("paimon_db", "external_tbl_renamed")
.location()
.toString
.split(':')
.apply(1)
.equals(expertTbLocation))
getActualTableLocation("paimon_db", "external_tbl_renamed").equals(
expertTbLocation))

// create managed table
spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
Expand All @@ -389,12 +379,55 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
spark.sql("ALTER TABLE managed_tbl RENAME TO managed_tbl_renamed")
checkAnswer(spark.sql("SELECT * FROM managed_tbl_renamed"), Row(1))
assert(
!loadTable("paimon_db", "managed_tbl_renamed")
.location()
.toString
.split(':')
.apply(1)
.equals(managedTbLocation.toString))
!getActualTableLocation("paimon_db", "managed_tbl_renamed").equals(
managedTbLocation.toString))
}
}
}
}
}

test("Paimon DDL with hive catalog: create external table without schema") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
spark.sql(s"USE $catalogName")
withTempDir {
tbLocation =>
withDatabase("paimon_db") {
spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db")
spark.sql(s"USE paimon_db")
withTable("t1", "t2", "t3", "t4", "t5") {
val expertTbLocation = tbLocation.getCanonicalPath
spark.sql(s"""
|CREATE TABLE t1 (id INT, pt INT) USING paimon
|PARTITIONED BY (pt)
|TBLPROPERTIES('primary-key' = 'id', 'k1' = 'v1')
|LOCATION '$expertTbLocation'
|""".stripMargin)
spark.sql("INSERT INTO t1 VALUES (1, 1)")

// create table without schema
spark.sql(s"CREATE TABLE t2 USING paimon LOCATION '$expertTbLocation'")
checkAnswer(spark.sql("SELECT * FROM t2"), Row(1, 1))
assert(getActualTableLocation("paimon_db", "t2").equals(expertTbLocation))

// create table with wrong schema
intercept[Exception] {
spark.sql(
s"CREATE TABLE t3 (fake_col INT) USING paimon LOCATION '$expertTbLocation'")
}

// create table with exists props
spark.sql(
s"CREATE TABLE t4 USING paimon TBLPROPERTIES ('k1' = 'v1') LOCATION '$expertTbLocation'")
checkAnswer(spark.sql("SELECT * FROM t4"), Row(1, 1))
assert(getActualTableLocation("paimon_db", "t4").equals(expertTbLocation))

// create table with new props
intercept[Exception] {
spark.sql(
s"CREATE TABLE t5 USING paimon TBLPROPERTIES ('k2' = 'v2') LOCATION '$expertTbLocation'")
}
}
}
}
Expand Down Expand Up @@ -445,4 +478,8 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
.toMap
tableProps("path").split(":")(1)
}

def getActualTableLocation(dbName: String, tblName: String): String = {
loadTable(dbName, tblName).location().toString.split(':').apply(1)
}
}

0 comments on commit c6387a6

Please sign in to comment.