Skip to content

Commit

Permalink
[flink] fix false assumption on catalog table passing logic (#4381)
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored Oct 30, 2024
1 parent 8b31057 commit 74634b9
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.paimon.CoreOptions.LogConsistency;
import org.apache.paimon.CoreOptions.StreamingReadMode;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
Expand Down Expand Up @@ -89,6 +91,12 @@ public abstract class AbstractFlinkTableFactory

private static final Logger LOG = LoggerFactory.getLogger(AbstractFlinkTableFactory.class);

@Nullable private final FlinkCatalog flinkCatalog;

public AbstractFlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) {
this.flinkCatalog = flinkCatalog;
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
Expand Down Expand Up @@ -227,7 +235,7 @@ static CatalogContext createCatalogContext(DynamicTableFactory.Context context)
Options.fromMap(context.getCatalogTable().getOptions()), new FlinkFileIOLoader());
}

static Table buildPaimonTable(DynamicTableFactory.Context context) {
Table buildPaimonTable(DynamicTableFactory.Context context) {
CatalogTable origin = context.getCatalogTable().getOrigin();
Table table;

Expand All @@ -243,16 +251,28 @@ static Table buildPaimonTable(DynamicTableFactory.Context context) {
newOptions.putAll(origin.getOptions());
newOptions.putAll(dynamicOptions);

// notice that the Paimon table schema must be the same with the Flink's
FileStoreTable fileStoreTable;
if (origin instanceof DataCatalogTable) {
FileStoreTable fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table();
table = fileStoreTable.copyWithoutTimeTravel(newOptions);
fileStoreTable = (FileStoreTable) ((DataCatalogTable) origin).table();
} else if (flinkCatalog == null) {
// In case Paimon is directly used as a Flink connector, instead of through catalog.
fileStoreTable = FileStoreTableFactory.create(createCatalogContext(context));
} else {
table =
FileStoreTableFactory.create(createCatalogContext(context))
.copyWithoutTimeTravel(newOptions);
// In cases like materialized table, the Paimon table might not be DataCatalogTable,
// but can still be acquired through the catalog.
Identifier identifier =
Identifier.create(
context.getObjectIdentifier().getDatabaseName(),
context.getObjectIdentifier().getObjectName());
try {
fileStoreTable = (FileStoreTable) flinkCatalog.catalog().getTable(identifier);
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
}
table = fileStoreTable.copyWithoutTimeTravel(newOptions);

// notice that the Paimon table schema must be the same with the Flink's
Schema schema = FlinkCatalog.fromCatalogTable(context.getCatalogTable());

RowType rowType = toLogicalType(schema.rowType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public Catalog catalog() {

@Override
public Optional<Factory> getFactory() {
return Optional.of(new FlinkTableFactory());
return Optional.of(new FlinkTableFactory(this));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,20 @@
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;

import javax.annotation.Nullable;

import static org.apache.paimon.CoreOptions.AUTO_CREATE;
import static org.apache.paimon.flink.FlinkCatalogFactory.IDENTIFIER;

/** A paimon {@link DynamicTableFactory} to create source and sink. */
public class FlinkTableFactory extends AbstractFlinkTableFactory {
public FlinkTableFactory() {
this(null);
}

public FlinkTableFactory(@Nullable FlinkCatalog flinkCatalog) {
super(flinkCatalog);
}

@Override
public String factoryIdentifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableDescriptor;
Expand All @@ -42,6 +45,7 @@
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
Expand All @@ -60,6 +64,7 @@
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.refresh.RefreshHandler;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -88,6 +93,7 @@
import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM;
import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatCollection;
Expand Down Expand Up @@ -746,6 +752,52 @@ void testCreateTableFromTableDescriptor() throws Exception {
checkCreateTable(path1, catalogTable, (CatalogTable) catalog.getTable(path1));
}

@Test
void testBuildPaimonTableWithCustomScheme() throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
CatalogTable table = createTable(optionProvider(false).iterator().next());
catalog.createTable(path1, table, false);
checkCreateTable(path1, table, catalog.getTable(path1));

List<Column> columns =
Arrays.asList(
Column.physical("first", DataTypes.STRING()),
Column.physical("second", DataTypes.INT()),
Column.physical("third", DataTypes.STRING()),
Column.physical(
"four",
DataTypes.ROW(
DataTypes.FIELD("f1", DataTypes.STRING()),
DataTypes.FIELD("f2", DataTypes.INT()),
DataTypes.FIELD(
"f3",
DataTypes.MAP(
DataTypes.STRING(), DataTypes.INT())))));
DynamicTableFactory.Context context =
new FactoryUtil.DefaultDynamicTableContext(
ObjectIdentifier.of(
"default", path1.getDatabaseName(), path1.getObjectName()),
createResolvedTable(
new HashMap<String, String>() {
{
put("path", "unsupported-scheme://foobar");
}
},
columns,
Collections.emptyList(),
Collections.emptyList()),
Collections.emptyMap(),
new Configuration(),
Thread.currentThread().getContextClassLoader(),
false);

FlinkTableFactory factory = (FlinkTableFactory) catalog.getFactory().get();
Table builtTable = factory.buildPaimonTable(context);
assertThat(builtTable).isInstanceOf(FileStoreTable.class);
assertThat(((FileStoreTable) builtTable).schema().fieldNames())
.containsExactly("first", "second", "third", "four");
}

private void checkCreateTable(
ObjectPath path, CatalogBaseTable expected, CatalogBaseTable actual) {
checkEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
import static org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
Expand Down Expand Up @@ -1827,7 +1826,10 @@ private void testSinkParallelism(Integer configParallelism, int expectedParallel

DynamicTableSink tableSink =
new FlinkTableSink(
context.getObjectIdentifier(), buildPaimonTable(context), context, null);
context.getObjectIdentifier(),
new FlinkTableFactory().buildPaimonTable(context),
context,
null);
assertThat(tableSink).isInstanceOf(FlinkTableSink.class);

// 2. get sink provider
Expand Down

0 comments on commit 74634b9

Please sign in to comment.