From 74634b9cd07e109295e3cc4b2252e0cd9e258c77 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Wed, 30 Oct 2024 17:25:12 +0800 Subject: [PATCH] [flink] fix false assumption on catalog table passing logic (#4381) --- .../flink/AbstractFlinkTableFactory.java | 34 +++++++++--- .../org/apache/paimon/flink/FlinkCatalog.java | 2 +- .../paimon/flink/FlinkTableFactory.java | 9 ++++ .../apache/paimon/flink/FlinkCatalogTest.java | 52 +++++++++++++++++++ .../paimon/flink/ReadWriteTableITCase.java | 6 ++- 5 files changed, 93 insertions(+), 10 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index e469044f5f4f..27eef48b227b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -22,7 +22,9 @@ import org.apache.paimon.CoreOptions.LogConsistency; import org.apache.paimon.CoreOptions.StreamingReadMode; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.Timestamp; import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.flink.sink.FlinkTableSink; @@ -89,6 +91,12 @@ public abstract class AbstractFlinkTableFactory private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkTableFactory.class); + @Nullable private final FlinkCatalog flinkCatalog; + + public AbstractFlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) { + this.flinkCatalog = flinkCatalog; + } + @Override public DynamicTableSource createDynamicTableSource(Context context) { CatalogTable origin = context.getCatalogTable().getOrigin(); @@ -227,7 +235,7 @@ static CatalogContext createCatalogContext(DynamicTableFactory.Context context) Options.fromMap(context.getCatalogTable().getOptions()), new FlinkFileIOLoader()); } - static Table buildPaimonTable(DynamicTableFactory.Context context) { + Table buildPaimonTable(DynamicTableFactory.Context context) { CatalogTable origin = context.getCatalogTable().getOrigin(); Table table; @@ -243,16 +251,28 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) { newOptions.putAll(origin.getOptions()); newOptions.putAll(dynamicOptions); - // notice that the Paimon table schema must be the same with the Flink's + FileStoreTable fileStoreTable; if (origin instanceof DataCatalogTable) { - FileStoreTable fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table(); - table = fileStoreTable.copyWithoutTimeTravel(newOptions); + fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table(); + } else if (flinkCatalog == null) { + // In case Paimon is directly used as a Flink connector, instead of through catalog. + fileStoreTable = FileStoreTableFactory.create(createCatalogContext(context)); } else { - table = - FileStoreTableFactory.create(createCatalogContext(context)) - .copyWithoutTimeTravel(newOptions); + // In cases like materialized table, the Paimon table might not be DataCatalogTable, + // but can still be acquired through the catalog. + Identifier identifier = + Identifier.create( + context.getObjectIdentifier().getDatabaseName(), + context.getObjectIdentifier().getObjectName()); + try { + fileStoreTable = (FileStoreTable) flinkCatalog.catalog().getTable(identifier); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } } + table = fileStoreTable.copyWithoutTimeTravel(newOptions); + // notice that the Paimon table schema must be the same with the Flink's Schema schema = FlinkCatalog.fromCatalogTable(context.getCatalogTable()); RowType rowType = toLogicalType(schema.rowType()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index bfcdfdb55f37..2a2e104c2c72 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -203,7 +203,7 @@ public Catalog catalog() { @Override public Optional getFactory() { - return Optional.of(new FlinkTableFactory()); + return Optional.of(new FlinkTableFactory(this)); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java index 96c81fdb720c..d5c1ed043b56 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkTableFactory.java @@ -30,11 +30,20 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableFactory; +import javax.annotation.Nullable; + import static org.apache.paimon.CoreOptions.AUTO_CREATE; import static org.apache.paimon.flink.FlinkCatalogFactory.IDENTIFIER; /** A paimon {@link DynamicTableFactory} to create source and sink. */ public class FlinkTableFactory extends AbstractFlinkTableFactory { + public FlinkTableFactory() { + this(null); + } + + public FlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) { + super(flinkCatalog); + } @Override public String factoryIdentifier() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 4f2db7aabeed..27a89510975f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -28,9 +28,12 @@ import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableDescriptor; @@ -42,6 +45,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.IntervalFreshness; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -60,6 +64,7 @@ import org.apache.flink.table.expressions.ResolvedExpression; import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.refresh.RefreshHandler; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -88,6 +93,7 @@ import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM; +import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatCollection; @@ -746,6 +752,52 @@ void testCreateTableFromTableDescriptor() throws Exception { checkCreateTable(path1, catalogTable, (CatalogTable) catalog.getTable(path1)); } + @Test + void testBuildPaimonTableWithCustomScheme() throws Exception { + catalog.createDatabase(path1.getDatabaseName(), null, false); + CatalogTable table = createTable(optionProvider(false).iterator().next()); + catalog.createTable(path1, table, false); + checkCreateTable(path1, table, catalog.getTable(path1)); + + List columns = + Arrays.asList( + Column.physical("first", DataTypes.STRING()), + Column.physical("second", DataTypes.INT()), + Column.physical("third", DataTypes.STRING()), + Column.physical( + "four", + DataTypes.ROW( + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("f2", DataTypes.INT()), + DataTypes.FIELD( + "f3", + DataTypes.MAP( + DataTypes.STRING(), DataTypes.INT()))))); + DynamicTableFactory.Context context = + new FactoryUtil.DefaultDynamicTableContext( + ObjectIdentifier.of( + "default", path1.getDatabaseName(), path1.getObjectName()), + createResolvedTable( + new HashMap() { + { + put("path", "unsupported-scheme://foobar"); + } + }, + columns, + Collections.emptyList(), + Collections.emptyList()), + Collections.emptyMap(), + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + + FlinkTableFactory factory = (FlinkTableFactory) catalog.getFactory().get(); + Table builtTable = factory.buildPaimonTable(context); + assertThat(builtTable).isInstanceOf(FileStoreTable.class); + assertThat(((FileStoreTable) builtTable).schema().fieldNames()) + .containsExactly("first", "second", "third", "four"); + } + private void checkCreateTable( ObjectPath path, CatalogBaseTable expected, CatalogBaseTable actual) { checkEquals( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index d1e9b23e1b54..10de1ae4839f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -76,7 +76,6 @@ import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST; import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE; -import static org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable; import static org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM; import static org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM; @@ -1827,7 +1826,10 @@ private void testSinkParallelism(Integer configParallelism, int expectedParallel DynamicTableSink tableSink = new FlinkTableSink( - context.getObjectIdentifier(), buildPaimonTable(context), context, null); + context.getObjectIdentifier(), + new FlinkTableFactory().buildPaimonTable(context), + context, + null); assertThat(tableSink).isInstanceOf(FlinkTableSink.class); // 2. get sink provider