Skip to content

Commit

Permalink
[followup]
Browse files Browse the repository at this point in the history
  • Loading branch information
YannByron committed Jun 14, 2024
1 parent 5b16576 commit dda1d06
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {

case a @ PaimonV2WriteCommand(table, _)
if a.resolved && !schemaCompatible(
a.query.output.toStructType,
table.output.toStructType) =>
if !schemaCompatible(a.query.output.toStructType, table.output.toStructType) =>
val newQuery = resolveQueryColumns(a.query, table.output)
if (newQuery != a.query) {
Compatibility.withNewQuery(a, newQuery)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
Expand Down Expand Up @@ -67,9 +68,15 @@ public abstract class SparkReadTestBase {
@BeforeAll
public static void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
warehousePath = new Path("file:" + tempDir.toString());
spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.conf().set("spark.sql.catalog.paimon", SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.paimon.warehouse", warehousePath.toString());
spark =
SparkSession.builder()
.master("local[2]")
.config("spark.sql.catalog.paimon", SparkCatalog.class.getName())
.config("spark.sql.catalog.paimon.warehouse", warehousePath.toString())
.config(
"spark.sql.extensions",
PaimonSparkSessionExtensions.class.getName())
.getOrCreate();
spark.sql("USE paimon");
}

Expand Down

0 comments on commit dda1d06

Please sign in to comment.