Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 29, 2023
1 parent 010b8c3 commit 2fb982f
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 29 deletions.
4 changes: 0 additions & 4 deletions paimon-spark/paimon-spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ under the License.
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.FieldReference;
Expand All @@ -58,7 +57,7 @@
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;

/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog implements TableCatalog, SupportsNamespaces {
public class SparkCatalog extends SparkBaseCatalog {

private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import org.apache.paimon.spark.procedure.ProcedureBuilder;

import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;

public abstract class SparkBaseCatalog implements ProcedureCatalog, WithPaimonCatalog {
/** Spark base catalog. */
public abstract class SparkBaseCatalog
implements TableCatalog, SupportsNamespaces, ProcedureCatalog, WithPaimonCatalog {
@Override
public Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException {
if (Catalog.SYSTEM_DATABASE_NAME.equals(identifier.namespace()[0])) {
ProcedureBuilder builder = SparkProcedures.newBuilder(identifier.name());
if (builder != null) {
return builder.withTableCatalog((TableCatalog) this).build();
return builder.withTableCatalog(this).build();
}
}
throw new NoSuchProcedureException(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
super.sparkConf
.set("spark.sql.warehouse.dir", tempDBDir.getCanonicalPath)
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.catalog.spark_catalog", "org.apache.paimon.spark.SparkGenericCatalog")
.set("spark.sql.catalog.spark_catalog", classOf[SparkGenericCatalog[_]].getName)
.set("spark.sql.extensions", classOf[PaimonSparkSessionExtensions].getName)
}

Expand All @@ -50,6 +50,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
spark.sql("USE default")
spark.sql(s"DROP DATABASE $hiveDbName CASCADE")
} finally {
super.afterAll()
testHiveMetastore.stop()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,20 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
Seq("parquet", "orc").foreach(
format => {
test(s"Paimon migrate table procedure: migrate $format non-partitioned table") {
val hiveTbl = s"${format}_hive_tbl"
withTable(hiveTbl) {
withTable("hive_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE $hiveTbl (id STRING, name STRING, pt STRING)
|CREATE TABLE hive_tbl (id STRING, name STRING, pt STRING)
|USING $format
|""".stripMargin)

spark.sql(s"INSERT INTO $hiveTbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")

checkAnswer(
spark.sql(s"SELECT * FROM $hiveTbl ORDER BY id"),
Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)
spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")

spark.sql(
s"CALL sys.migrate_table(format => 'hive', table => '$hiveDbName.$hiveTbl', tblproperties => 'file.format=$format')")
s"CALL sys.migrate_table(format => 'hive', table => '$hiveDbName.hive_tbl', tblproperties => 'file.format=$format')")

checkAnswer(
spark.sql(s"SELECT * FROM $hiveTbl ORDER BY id"),
spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)
}
}
Expand All @@ -52,26 +47,21 @@ class MigrateTableProcedureTest extends PaimonHiveTestBase {
Seq("parquet", "orc").foreach(
format => {
test(s"Paimon migrate table procedure: migrate $format partitioned table") {
val hiveTbl = s"${format}_hive_pt_tbl"
withTable(hiveTbl) {
withTable("hive_tbl") {
// create hive table
spark.sql(s"""
|CREATE TABLE $hiveTbl (id STRING, name STRING, pt STRING)
|CREATE TABLE hive_tbl (id STRING, name STRING, pt STRING)
|USING $format
|PARTITIONED BY (pt)
|""".stripMargin)

spark.sql(s"INSERT INTO $hiveTbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")

checkAnswer(
spark.sql(s"SELECT * FROM $hiveTbl ORDER BY id"),
Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)
spark.sql(s"INSERT INTO hive_tbl VALUES ('1', 'a', 'p1'), ('2', 'b', 'p2')")

spark.sql(
s"CALL sys.migrate_table(format => 'hive', table => '$hiveDbName.$hiveTbl', tblproperties => 'file.format=$format')")
s"CALL sys.migrate_table(format => 'hive', table => '$hiveDbName.hive_tbl', tblproperties => 'file.format=$format')")

checkAnswer(
spark.sql(s"SELECT * FROM $hiveTbl ORDER BY id"),
spark.sql(s"SELECT * FROM hive_tbl ORDER BY id"),
Row("1", "a", "p1") :: Row("2", "b", "p2") :: Nil)
}
}
Expand Down

0 comments on commit 2fb982f

Please sign in to comment.