Skip to content

Commit

Permalink
[spark] Supports CTAS for different versions (apache#2591)
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron authored Jan 2, 2024
1 parent f159d9a commit e733ed5
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.shim

import org.apache.paimon.CoreOptions

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._

case class PaimonCreateTableAsSelectStrategy(spark: SparkSession) extends Strategy {

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
catalog match {
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq
val (coreOptions, writeOptions) = options.partition {
case (key, _) => coreOptionKeys.contains(key)
}
val newProps = CatalogV2Util.withDefaultOwnership(props) ++ coreOptions
CreateTableAsSelectExec(
catalog,
ident,
parts,
query,
planLater(query),
newProps,
new CaseInsensitiveStringMap(writeOptions.asJava),
ifNotExists
) :: Nil
}
case _ => Nil
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.sql

class DDLTest extends DDLTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,4 @@
*/
package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.junit.jupiter.api.Assertions

class DDLTest extends PaimonSparkTestBase {

import testImplicits._

test("Paimon: Create Table As Select") {
Seq((1L, "x1", "2023"), (2L, "x2", "2023"))
.toDF("a", "b", "pt")
.createOrReplaceTempView("source")

spark.sql("""
|CREATE TABLE t1 AS SELECT * FROM source
|""".stripMargin)
val t1 = loadTable("t1")
Assertions.assertTrue(t1.primaryKeys().isEmpty)
Assertions.assertTrue(t1.partitionKeys().isEmpty)

spark.sql(
"""
|CREATE TABLE t2
|PARTITIONED BY (pt)
|TBLPROPERTIES ('bucket' = '5', 'primary-key' = 'a,pt', 'target-file-size' = '128MB')
|AS SELECT * FROM source
|""".stripMargin)
val t2 = loadTable("t2")
Assertions.assertEquals(2, t2.primaryKeys().size())
Assertions.assertTrue(t2.primaryKeys().contains("a"))
Assertions.assertTrue(t2.primaryKeys().contains("pt"))
Assertions.assertEquals(1, t2.partitionKeys().size())
Assertions.assertEquals("pt", t2.partitionKeys().get(0))

// check all the core options
Assertions.assertEquals("5", t2.options().get("bucket"))
Assertions.assertEquals("128MB", t2.options().get("target-file-size"))
}

}
class DDLTest extends DDLTestBase {}
34 changes: 16 additions & 18 deletions paimon-spark/paimon-spark-3.4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
Expand Down Expand Up @@ -137,24 +153,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.shim

import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.SparkCatalog

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, LogicalPlan, TableSpec}
import org.apache.spark.sql.connector.catalog.StagingTableCatalog
import org.apache.spark.sql.execution.{PaimonStrategyHelper, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.CreateTableAsSelectExec
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._

case class PaimonCreateTableAsSelectStrategy(spark: SparkSession)
extends Strategy
with PaimonStrategyHelper {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case CreateTableAsSelect(
ResolvedIdentifier(catalog: SparkCatalog, ident),
parts,
query,
tableSpec: TableSpec,
options,
ifNotExists,
analyzedQuery) =>
assert(analyzedQuery.isDefined)
catalog match {
case _: StagingTableCatalog =>
throw new RuntimeException("Paimon can't extend StagingTableCatalog for now.")
case _ =>
val coreOptionKeys = CoreOptions.getOptions.asScala.map(_.key()).toSeq
val (coreOptions, writeOptions) = options.partition {
case (key, _) => coreOptionKeys.contains(key)
}
val newTableSpec = tableSpec.copy(properties = tableSpec.properties ++ coreOptions)
CreateTableAsSelectExec(
catalog.asTableCatalog,
ident,
parts,
analyzedQuery.get,
planLater(query),
qualifyLocInTableSpec(newTableSpec),
new CaseInsensitiveStringMap(writeOptions.asJava),
ifNotExists
) :: Nil
}
case _ => Nil
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.sql

class DDLTest extends DDLTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.paimon.spark.sql

class DDLTest extends DDLTestBase {}

This file was deleted.

Loading

0 comments on commit e733ed5

Please sign in to comment.