Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 10, 2024
1 parent 300cc67 commit 9767d5b
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,26 +208,18 @@ public TableSchema createTable(Schema schema) throws Exception {
return createTable(schema, false);
}

public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws Exception {
public TableSchema createTable(Schema schema, boolean externalTable) throws Exception {
while (true) {
Optional<TableSchema> latest = latest();
if (latest.isPresent()) {
TableSchema oldSchema = latest.get();
boolean isSame =
Objects.equals(oldSchema.fields(), schema.fields())
&& Objects.equals(oldSchema.partitionKeys(), schema.partitionKeys())
&& Objects.equals(oldSchema.primaryKeys(), schema.primaryKeys())
&& Objects.equals(oldSchema.options(), schema.options());
if (ignoreIfExistsSame && isSame) {
return oldSchema;
if (externalTable && checkSchemaForExternalTable(latest.get(), schema)) {
return latest.get();
} else {
throw new IllegalStateException(
"Schema in filesystem exists, creation is not allowed.");
}

throw new IllegalStateException(
"Schema in filesystem exists, please use updating,"
+ " latest schema is: "
+ oldSchema);
}

List<DataField> fields = schema.fields();
List<String> partitionKeys = schema.partitionKeys();
List<String> primaryKeys = schema.primaryKeys();
Expand All @@ -254,6 +246,39 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws
}
}

private boolean checkSchemaForExternalTable(TableSchema existsSchema, Schema newSchema) {
// When creating an external table, if the table already exists in the location, we can
// choose not to specify the fields.
if (newSchema.fields().isEmpty()
// When the fields are explicitly specified, we need check for consistency.
|| (Objects.equals(existsSchema.fields(), newSchema.fields())
&& Objects.equals(existsSchema.partitionKeys(), newSchema.partitionKeys())
&& Objects.equals(existsSchema.primaryKeys(), newSchema.primaryKeys()))) {
// check for options
Map<String, String> existsOptions = existsSchema.options();
Map<String, String> newOptions = newSchema.options();
newOptions.forEach(
(key, value) -> {
if (!key.equals(Catalog.OWNER_PROP)
&& (!existsOptions.containsKey(key)
|| !existsOptions.get(key).equals(value))) {
throw new RuntimeException(
"New schema's options are not equal to the exists schema's, new schema: "
+ newOptions
+ ", exists schema: "
+ existsOptions);
}
});
} else {
throw new RuntimeException(
"New schema is not equal to exists schema, new schema: "
+ newSchema
+ ", exists schema: "
+ existsSchema);
}
return true;
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(SchemaChange... changes) throws Exception {
return commitChanges(Arrays.asList(changes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,11 +720,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try {
tableSchema = schemaManager(identifier, location).createTable(schema, externalTable);
} catch (Exception e) {
throw new RuntimeException(
"Failed to commit changes of table "
+ identifier.getFullName()
+ " to underlying files.",
e);
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
}

try {
Expand All @@ -735,7 +731,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
identifier, tableSchema, location, externalTable)));
} catch (Exception e) {
try {
fileIO.deleteDirectoryQuietly(location);
if (!externalTable) {
fileIO.deleteDirectoryQuietly(location);
}
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", location, identifier, ee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, SupportView {
Expand Down Expand Up @@ -298,26 +297,8 @@ public org.apache.spark.sql.connector.catalog.Table createTable(
Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
try {
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if ((!usePaimon(provider))
&& SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
Map<String, String> newProperties = new HashMap<>(properties);
newProperties.put(TYPE.key(), FORMAT_TABLE.toString());
newProperties.put(FILE_FORMAT.key(), provider.toLowerCase());
catalog.createTable(
toIdentifier(ident),
toInitialSchema(schema, partitions, newProperties),
false);
} else {
checkArgument(
usePaimon(provider),
"SparkCatalog can only create paimon table, but current provider is %s",
provider);
catalog.createTable(
toIdentifier(ident),
toInitialSchema(schema, partitions, properties),
false);
}
catalog.createTable(
toIdentifier(ident), toInitialSchema(schema, partitions, properties), false);
return loadTable(ident);
} catch (Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistsException(ident);
Expand Down Expand Up @@ -406,9 +387,12 @@ private static SchemaChange.Move getMove(
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map<String, String> properties) {
Map<String, String> normalizedProperties = new HashMap<>(properties);
if (!normalizedProperties.containsKey(TableCatalog.PROP_PROVIDER)) {
normalizedProperties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME());
String provider = properties.get(TableCatalog.PROP_PROVIDER);
if (!usePaimon(provider) && SparkSource.FORMAT_NAMES().contains(provider.toLowerCase())) {
normalizedProperties.put(TYPE.key(), FORMAT_TABLE.toString());
normalizedProperties.put(FILE_FORMAT.key(), provider.toLowerCase());
}
normalizedProperties.remove(TableCatalog.PROP_PROVIDER);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
if (normalizedProperties.containsKey(TableCatalog.PROP_LOCATION)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object SparkSource {

val NAME = "paimon"

val FORMAT_NAMES = Seq("csv", "orc", "parquet")
val FORMAT_NAMES: Seq[String] = Seq("csv", "orc", "parquet")

def toBaseRelation(table: FileStoreTable, _sqlContext: SQLContext): BaseRelation = {
new BaseRelation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
test("Paimon DDL: create table without using paimon") {
withTable("paimon_tbl") {
sql("CREATE TABLE paimon_tbl (id int)")
assert(loadTable("paimon_tbl").options().get("provider").equals("paimon"))
assert(!loadTable("paimon_tbl").options().containsKey("provider"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,7 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
spark.sql(
s"CREATE TABLE external_tbl (id INT) USING paimon LOCATION '$expertTbLocation'")
checkAnswer(spark.sql("SELECT * FROM external_tbl"), Row(1))
assert(
loadTable("paimon_db", "external_tbl")
.location()
.toString
.split(':')
.apply(1)
.equals(expertTbLocation))
assert(getActualTableLocation("paimon_db", "external_tbl").equals(expertTbLocation))

// create managed table
spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
Expand Down Expand Up @@ -373,12 +367,8 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
spark.sql("ALTER TABLE external_tbl RENAME TO external_tbl_renamed")
checkAnswer(spark.sql("SELECT * FROM external_tbl_renamed"), Row(1))
assert(
loadTable("paimon_db", "external_tbl_renamed")
.location()
.toString
.split(':')
.apply(1)
.equals(expertTbLocation))
getActualTableLocation("paimon_db", "external_tbl_renamed").equals(
expertTbLocation))

// create managed table
spark.sql(s"CREATE TABLE managed_tbl (id INT) USING paimon")
Expand All @@ -389,12 +379,55 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
spark.sql("ALTER TABLE managed_tbl RENAME TO managed_tbl_renamed")
checkAnswer(spark.sql("SELECT * FROM managed_tbl_renamed"), Row(1))
assert(
!loadTable("paimon_db", "managed_tbl_renamed")
.location()
.toString
.split(':')
.apply(1)
.equals(managedTbLocation.toString))
!getActualTableLocation("paimon_db", "managed_tbl_renamed").equals(
managedTbLocation.toString))
}
}
}
}
}

test("Paimon DDL with hive catalog: create external table without schema") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
spark.sql(s"USE $catalogName")
withTempDir {
tbLocation =>
withDatabase("paimon_db") {
spark.sql(s"CREATE DATABASE IF NOT EXISTS paimon_db")
spark.sql(s"USE paimon_db")
withTable("t1", "t2", "t3", "t4", "t5") {
val expertTbLocation = tbLocation.getCanonicalPath
spark.sql(s"""
|CREATE TABLE t1 (id INT, pt INT) USING paimon
|PARTITIONED BY (pt)
|TBLPROPERTIES('primary-key' = 'id', 'k1' = 'v1')
|LOCATION '$expertTbLocation'
|""".stripMargin)
spark.sql("INSERT INTO t1 VALUES (1, 1)")

// create table without schema
spark.sql(s"CREATE TABLE t2 USING paimon LOCATION '$expertTbLocation'")
checkAnswer(spark.sql("SELECT * FROM t2"), Row(1, 1))
assert(getActualTableLocation("paimon_db", "t2").equals(expertTbLocation))

// create table with wrong schema
intercept[Exception] {
spark.sql(
s"CREATE TABLE t3 (fake_col INT) USING paimon LOCATION '$expertTbLocation'")
}

// create table with exists props
spark.sql(
s"CREATE TABLE t4 USING paimon TBLPROPERTIES ('k1' = 'v1') LOCATION '$expertTbLocation'")
checkAnswer(spark.sql("SELECT * FROM t4"), Row(1, 1))
assert(getActualTableLocation("paimon_db", "t4").equals(expertTbLocation))

// create table with new props
intercept[Exception] {
spark.sql(
s"CREATE TABLE t5 USING paimon TBLPROPERTIES ('k2' = 'v2') LOCATION '$expertTbLocation'")
}
}
}
}
Expand Down Expand Up @@ -445,4 +478,8 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
.toMap
tableProps("path").split(":")(1)
}

def getActualTableLocation(dbName: String, tblName: String): String = {
loadTable(dbName, tblName).location().toString.split(':').apply(1)
}
}

0 comments on commit 9767d5b

Please sign in to comment.