Skip to content

Commit

Permalink
[flink] Introduce max_two_pt for Flink lookup join (apache#4772)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Dec 25, 2024
1 parent bf58ddd commit f629481
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -45,22 +46,27 @@ public class DynamicPartitionLoader implements Serializable {

private static final String MAX_PT = "max_pt()";

private static final String MAX_TWO_PT = "max_two_pt()";

private final Table table;
private final Duration refreshInterval;
private final int maxPartitionNum;

private Comparator<InternalRow> comparator;

private LocalDateTime lastRefresh;
@Nullable private BinaryRow partition;
private List<BinaryRow> partitions;

private DynamicPartitionLoader(Table table, Duration refreshInterval) {
private DynamicPartitionLoader(Table table, Duration refreshInterval, int maxPartitionNum) {
this.table = table;
this.refreshInterval = refreshInterval;
this.maxPartitionNum = maxPartitionNum;
}

public void open() {
RowType partitionType = table.rowType().project(table.partitionKeys());
this.comparator = CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
this.partitions = Collections.emptyList();
}

public void addPartitionKeysTo(List<String> joinKeys, List<String> projectFields) {
Expand All @@ -71,9 +77,8 @@ public void addPartitionKeysTo(List<String> joinKeys, List<String> projectFields
partitionKeys.stream().filter(k -> !projectFields.contains(k)).forEach(projectFields::add);
}

@Nullable
public BinaryRow partition() {
return partition;
public List<BinaryRow> partitions() {
return partitions;
}

/** @return true if partition changed. */
Expand All @@ -83,14 +88,34 @@ public boolean checkRefresh() {
return false;
}

BinaryRow previous = this.partition;
partition =
table.newReadBuilder().newScan().listPartitions().stream()
.max(comparator)
.orElse(null);
List<BinaryRow> newPartitions = getMaxPartitions();
lastRefresh = LocalDateTime.now();

return !Objects.equals(previous, partition);
if (newPartitions.size() != partitions.size()) {
partitions = newPartitions;
return true;
} else {
for (int i = 0; i < newPartitions.size(); i++) {
if (comparator.compare(newPartitions.get(i), partitions.get(i)) != 0) {
partitions = newPartitions;
return true;
}
}
return false;
}
}

private List<BinaryRow> getMaxPartitions() {
List<BinaryRow> newPartitions =
table.newReadBuilder().newScan().listPartitions().stream()
.sorted(comparator.reversed())
.collect(Collectors.toList());

if (newPartitions.size() <= maxPartitionNum) {
return newPartitions;
} else {
return newPartitions.subList(0, maxPartitionNum);
}
}

@Nullable
Expand All @@ -101,13 +126,21 @@ public static DynamicPartitionLoader of(Table table) {
return null;
}

if (!dynamicPartition.equalsIgnoreCase(MAX_PT)) {
throw new UnsupportedOperationException(
"Unsupported dynamic partition pattern: " + dynamicPartition);
int maxPartitionNum;
switch (dynamicPartition.toLowerCase()) {
case MAX_PT:
maxPartitionNum = 1;
break;
case MAX_TWO_PT:
maxPartitionNum = 2;
break;
default:
throw new UnsupportedOperationException(
"Unsupported dynamic partition pattern: " + dynamicPartition);
}

Duration refresh =
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
return new DynamicPartitionLoader(table, refresh);
return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
Expand Down Expand Up @@ -203,9 +204,9 @@ private void open() throws Exception {
if (partitionLoader != null) {
partitionLoader.open();
partitionLoader.checkRefresh();
BinaryRow partition = partitionLoader.partition();
if (partition != null) {
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
}
}

Expand Down Expand Up @@ -236,17 +237,17 @@ public Collection<RowData> lookup(RowData keyRow) {
tryRefresh();

InternalRow key = new FlinkRowWrapper(keyRow);
if (partitionLoader != null) {
if (partitionLoader.partition() == null) {
return Collections.emptyList();
}
key = JoinedRow.join(key, partitionLoader.partition());
if (partitionLoader == null) {
return lookupInternal(key);
}

if (partitionLoader.partitions().isEmpty()) {
return Collections.emptyList();
}

List<InternalRow> results = lookupTable.get(key);
List<RowData> rows = new ArrayList<>(results.size());
for (InternalRow matchedRow : results) {
rows.add(new FlinkRowData(matchedRow));
List<RowData> rows = new ArrayList<>();
for (BinaryRow partition : partitionLoader.partitions()) {
rows.addAll(lookupInternal(JoinedRow.join(key, partition)));
}
return rows;
} catch (OutOfRangeException | ReopenException e) {
Expand All @@ -257,7 +258,28 @@ public Collection<RowData> lookup(RowData keyRow) {
}
}

private Predicate createSpecificPartFilter(BinaryRow partition) {
private List<RowData> lookupInternal(InternalRow key) throws IOException {
List<RowData> rows = new ArrayList<>();
List<InternalRow> lookupResults = lookupTable.get(key);
for (InternalRow matchedRow : lookupResults) {
rows.add(new FlinkRowData(matchedRow));
}
return rows;
}

private Predicate createSpecificPartFilter(List<BinaryRow> partitions) {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec =
Expand Down Expand Up @@ -291,15 +313,15 @@ void tryRefresh() throws Exception {
// 2. refresh dynamic partition
if (partitionLoader != null) {
boolean partitionChanged = partitionLoader.checkRefresh();
BinaryRow partition = partitionLoader.partition();
if (partition == null) {
List<BinaryRow> partitions = partitionLoader.partitions();
if (partitions.isEmpty()) {
// no data to be load, fast exit
return;
}

if (partitionChanged) {
// reopen with latest partition
lookupTable.specificPartitionFilter(createSpecificPartFilter(partition));
lookupTable.specificPartitionFilter(createSpecificPartFilter(partitions));
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,4 +1006,43 @@ public void testOverwriteDimTable(boolean isPkTable) throws Exception {

iterator.close();
}

@ParameterizedTest
@EnumSource(LookupCacheMode.class)
public void testLookupMaxTwoPt0(LookupCacheMode mode) throws Exception {
sql(
"CREATE TABLE PARTITIONED_DIM (pt STRING, i INT, v INT)"
+ "PARTITIONED BY (`pt`) WITH ("
+ "'lookup.dynamic-partition' = 'max_two_pt()', "
+ "'lookup.dynamic-partition.refresh-interval' = '1 ms', "
+ "'lookup.cache' = '%s', "
+ "'continuous.discovery-interval'='1 ms')",
mode);

String query =
"SELECT D.pt, T.i, D.v FROM T LEFT JOIN PARTITIONED_DIM for SYSTEM_TIME AS OF T.proctime AS D ON T.i = D.i";
BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-01', 1, 1), ('2024-10-01', 2, 2)");
Thread.sleep(500); // wait refresh
sql("INSERT INTO T VALUES (1)");
List<Row> result = iterator.collect(1);
assertThat(result).containsExactlyInAnyOrder(Row.of("2024-10-01", 1, 1));

sql("INSERT INTO PARTITIONED_DIM VALUES ('2024-10-02', 2, 2)");
Thread.sleep(500); // wait refresh
sql("INSERT INTO T VALUES (2)");
result = iterator.collect(2);
assertThat(result)
.containsExactlyInAnyOrder(Row.of("2024-10-01", 2, 2), Row.of("2024-10-02", 2, 2));

sql("ALTER TABLE PARTITIONED_DIM DROP PARTITION (pt = '2024-10-01')");
Thread.sleep(500); // wait refresh
sql("INSERT INTO T VALUES (1), (2)");
result = iterator.collect(2);
assertThat(result)
.containsExactlyInAnyOrder(Row.of(null, 1, null), Row.of("2024-10-02", 2, 2));

iterator.close();
}
}

0 comments on commit f629481

Please sign in to comment.