Skip to content

Commit

Permalink
[flink] Lookup source should ignore scan options (#4383)
Browse files Browse the repository at this point in the history
This closes #4383.
  • Loading branch information
yuzelin authored Oct 28, 2024
1 parent 40d1e27 commit b99f8e6
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Projection;
Expand All @@ -41,10 +44,6 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
import org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
import org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
Expand All @@ -56,7 +55,10 @@
import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;

Expand All @@ -80,6 +82,16 @@
public abstract class BaseDataTableSource extends FlinkTableSource
implements LookupTableSource, SupportsWatermarkPushDown, SupportsAggregatePushDown {

private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS =
Arrays.asList(
CoreOptions.SCAN_TIMESTAMP,
CoreOptions.SCAN_TIMESTAMP_MILLIS,
CoreOptions.SCAN_WATERMARK,
CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
CoreOptions.SCAN_SNAPSHOT_ID,
CoreOptions.SCAN_TAG_NAME,
CoreOptions.SCAN_VERSION);

protected final ObjectIdentifier tableIdentifier;
protected final boolean streaming;
protected final DynamicTableFactory.Context context;
Expand Down Expand Up @@ -231,6 +243,12 @@ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
"Currently, lookup dim table only support FileStoreTable but is "
+ table.getClass().getName());
}

if (limit != null) {
throw new RuntimeException(
"Limit push down should not happen in Lookup source, but it is " + limit);
Expand All @@ -244,11 +262,34 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
boolean enableAsync = options.get(LOOKUP_ASYNC);
int asyncThreadNumber = options.get(LOOKUP_ASYNC_THREAD_NUMBER);
return LookupRuntimeProviderFactory.create(
new FileStoreLookupFunction(table, projection, joinKey, predicate),
getFileStoreLookupFunction(
context,
timeTravelDisabledTable((FileStoreTable) table),
projection,
joinKey),
enableAsync,
asyncThreadNumber);
}

protected FileStoreLookupFunction getFileStoreLookupFunction(
LookupContext context, Table table, int[] projection, int[] joinKey) {
return new FileStoreLookupFunction(table, projection, joinKey, predicate);
}

private FileStoreTable timeTravelDisabledTable(FileStoreTable table) {
Map<String, String> newOptions = new HashMap<>(table.options());
TIME_TRAVEL_OPTIONS.stream().map(ConfigOption::key).forEach(newOptions::remove);

CoreOptions.StartupMode startupMode = CoreOptions.fromMap(newOptions).startupMode();
if (startupMode != CoreOptions.StartupMode.COMPACTED_FULL) {
startupMode = CoreOptions.StartupMode.LATEST_FULL;
}
newOptions.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());

TableSchema newSchema = table.schema().copy(newOptions);
return table.copy(newSchema);
}

@Override
public boolean applyAggregates(
List<int[]> groupingSets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,45 +134,40 @@ public void testLookup(LookupCacheMode cacheMode) throws Exception {
iterator.close();
}

@ParameterizedTest
@EnumSource(LookupCacheMode.class)
public void testLookupIgnoreScanOptions(LookupCacheMode cacheMode) throws Exception {
initTable(cacheMode);
sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");

String scanOption;
if (ThreadLocalRandom.current().nextBoolean()) {
scanOption = "'scan.mode'='latest'";
} else {
scanOption = "'scan.snapshot-id'='2'";
}
String query =
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ OPTIONS("
+ scanOption
+ ") */"
+ " for system_time as of T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111, 1111),
Row.of(2, 22, 222, 2222),
Row.of(3, null, null, null));

sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111, 1111),
Row.of(2, 44, 444, 4444),
Row.of(3, 33, 333, 3333),
Row.of(4, null, null, null));

iterator.close();
@Test
public void testLookupIgnoreScanOptions() throws Exception {
sql(
"CREATE TABLE d (\n"
+ " pt INT,\n"
+ " id INT,\n"
+ " data STRING,\n"
+ " PRIMARY KEY (pt, id) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) WITH ( 'bucket' = '1', 'continuous.discovery-interval'='1 ms' )");
sql(
"CREATE TABLE t1 (\n"
+ " pt INT,\n"
+ " id INT,\n"
+ " data STRING,\n"
+ " `proctime` AS PROCTIME(),\n"
+ " PRIMARY KEY (pt, id) NOT ENFORCED\n"
+ ") PARTITIONED BY (pt) with ( 'continuous.discovery-interval'='1 ms' )");

sql("INSERT INTO d VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')");
sql("INSERT INTO t1 VALUES (1, 1, 'one'), (2, 2, 'two'), (3, 3, 'three')");

BlockingIterator<Row, Row> streamIter =
streamSqlBlockIter(
"SELECT T.pt, T.id, T.data, D.pt, D.id, D.data "
+ "FROM t1 AS T LEFT JOIN d /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'scan.snapshot-id'='2') */ "
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.id = D.id");

assertThat(streamIter.collect(3))
.containsExactlyInAnyOrder(
Row.of(1, 1, "one", null, null, null),
Row.of(2, 2, "two", null, null, null),
Row.of(3, 3, "three", 3, 3, "three"));

streamIter.close();
}

@ParameterizedTest
Expand Down

0 comments on commit b99f8e6

Please sign in to comment.