Skip to content

Commit

Permalink
[flink] Fix inconsistency problem when lookup join table with sequenc…
Browse files Browse the repository at this point in the history
…e field

This closes apache#2622
  • Loading branch information
Aitozi authored and JingsongLi committed Jan 5, 2024
1 parent e856f3b commit ddd717e
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) throw

return parallelExecute(
TypeUtils.project(rowType, keyProjection),
readBuilder,
s -> readBuilder.newRead().createReader(s),
splits,
options.pageSize(),
options.crossPartitionUpsertBootstrapParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallelExecution;
import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
Expand All @@ -45,7 +45,7 @@ public class SplitsParallelReadUtil {

public static RecordReader<InternalRow> parallelExecute(
RowType projectedType,
ReadBuilder readBuilder,
FunctionWithException<Split, RecordReader<InternalRow>, IOException> readBuilder,
List<Split> splits,
int pageSize,
int parallelism) {
Expand All @@ -61,7 +61,7 @@ public static RecordReader<InternalRow> parallelExecute(

public static <EXTRA> RecordReader<InternalRow> parallelExecute(
RowType projectedType,
ReadBuilder readBuilder,
FunctionWithException<Split, RecordReader<InternalRow>, IOException> readBuilder,
List<Split> splits,
int pageSize,
int parallelism,
Expand All @@ -72,8 +72,7 @@ public static <EXTRA> RecordReader<InternalRow> parallelExecute(
suppliers.add(
() -> {
try {
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(split);
RecordReader<InternalRow> reader = readBuilder.apply(split);
return Pair.of(reader, extraFunction.apply(split));
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public final RecordReader<InternalRow> reader(Split split) throws IOException {
return new RowDataRecordReader(read.createReader((DataSplit) split));
}

public final RecordReader<KeyValue> kvReader(Split split) throws IOException {
return read.createReader((DataSplit) split);
}

protected abstract RecordReader.RecordIterator<InternalRow> rowDataRecordIteratorFromKv(
RecordReader.RecordIterator<KeyValue> kvRecordIterator);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private transient long nextLoadTime;
private transient TableStreamingReader streamingReader;

private final boolean sequenceFieldEnabled;

public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
TableScanUtils.streamingReadingValidate(table);
Expand All @@ -119,6 +121,9 @@ public FileStoreLookupFunction(
}

this.predicate = predicate;
this.sequenceFieldEnabled =
table.primaryKeys().size() > 0
&& new CoreOptions(table.options()).sequenceField().isPresent();
}

public void open(FunctionContext context) throws Exception {
Expand Down Expand Up @@ -151,7 +156,8 @@ private void open() throws Exception {
table.primaryKeys(),
joinKeys,
recordFilter,
options.get(LOOKUP_CACHE_ROWS));
options.get(LOOKUP_CACHE_ROWS),
sequenceFieldEnabled);
this.nextLoadTime = -1;
this.streamingReader = new TableStreamingReader(table, projection, this.predicate);
bulkLoad(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,30 @@ static LookupTable create(
List<String> primaryKey,
List<String> joinKey,
Predicate<InternalRow> recordFilter,
long lruCacheSize)
long lruCacheSize,
boolean sequenceFieldEnabled)
throws IOException {
if (primaryKey.isEmpty()) {
return new NoPrimaryKeyLookupTable(
stateFactory, rowType, joinKey, recordFilter, lruCacheSize);
} else {
if (new HashSet<>(primaryKey).equals(new HashSet<>(joinKey))) {
return new PrimaryKeyLookupTable(
stateFactory, rowType, joinKey, recordFilter, lruCacheSize);
stateFactory,
rowType,
joinKey,
recordFilter,
lruCacheSize,
sequenceFieldEnabled);
} else {
return new SecondaryIndexLookupTable(
stateFactory, rowType, primaryKey, joinKey, recordFilter, lruCacheSize);
stateFactory,
rowType,
primaryKey,
joinKey,
recordFilter,
lruCacheSize,
sequenceFieldEnabled);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.lookup.RocksDBValueState;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
Expand All @@ -44,26 +46,41 @@ public class PrimaryKeyLookupTable implements LookupTable {
protected int[] primaryKeyMapping;

protected final KeyProjectedRow primaryKey;
protected final boolean sequenceFieldEnabled;

protected final int sequenceIndex;

public PrimaryKeyLookupTable(
RocksDBStateFactory stateFactory,
RowType rowType,
List<String> primaryKey,
Predicate<InternalRow> recordFilter,
long lruCacheSize)
long lruCacheSize,
boolean sequenceFieldEnabled)
throws IOException {
List<String> fieldNames = rowType.getFieldNames();
this.primaryKeyMapping = primaryKey.stream().mapToInt(fieldNames::indexOf).toArray();
this.primaryKey = new KeyProjectedRow(primaryKeyMapping);
this.sequenceFieldEnabled = sequenceFieldEnabled;
// append sequence number at last column when sequence field is attached.
RowType adjustedRowType = appendSequenceNumber(rowType);
this.sequenceIndex = adjustedRowType.getFieldCount() - 1;

this.tableState =
stateFactory.valueState(
"table",
InternalSerializers.create(TypeUtils.project(rowType, primaryKeyMapping)),
InternalSerializers.create(rowType),
InternalSerializers.create(adjustedRowType),
lruCacheSize);
this.recordFilter = recordFilter;
}

public static RowType appendSequenceNumber(RowType rowType) {
List<DataType> types = rowType.getFieldTypes();
types.add(DataTypes.BIGINT());
return RowType.of(types.toArray(new DataType[0]));
}

@Override
public List<InternalRow> get(InternalRow key) throws IOException {
InternalRow value = tableState.get(key);
Expand All @@ -75,6 +92,15 @@ public void refresh(Iterator<InternalRow> incremental) throws IOException {
while (incremental.hasNext()) {
InternalRow row = incremental.next();
primaryKey.replaceRow(row);
if (sequenceFieldEnabled) {
InternalRow previous = tableState.get(primaryKey);
// only update the kv when the new value's sequence number is higher.
if (previous != null
&& previous.getLong(sequenceIndex) > row.getLong(sequenceIndex)) {
continue;
}
}

if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
if (recordFilter.test(row)) {
tableState.put(primaryKey, row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,16 @@ public SecondaryIndexLookupTable(
List<String> primaryKey,
List<String> secKey,
Predicate<InternalRow> recordFilter,
long lruCacheSize)
long lruCacheSize,
boolean sequenceFieldEnabled)
throws IOException {
super(stateFactory, rowType, primaryKey, recordFilter, lruCacheSize / 2);
super(
stateFactory,
rowType,
primaryKey,
recordFilter,
lruCacheSize / 2,
sequenceFieldEnabled);
List<String> fieldNames = rowType.getFieldNames();
int[] secKeyMapping = secKey.stream().mapToInt(fieldNames::indexOf).toArray();
this.secKeyRow = new KeyProjectedRow(secKeyMapping);
Expand All @@ -65,9 +72,9 @@ public List<InternalRow> get(InternalRow key) throws IOException {
List<InternalRow> pks = indexState.get(key);
List<InternalRow> values = new ArrayList<>(pks.size());
for (InternalRow pk : pks) {
InternalRow value = tableState.get(pk);
if (value != null) {
values.add(value);
InternalRow row = tableState.get(pk);
if (row != null) {
values.add(row);
}
}
return values;
Expand All @@ -78,8 +85,23 @@ public void refresh(Iterator<InternalRow> incremental) throws IOException {
while (incremental.hasNext()) {
InternalRow row = incremental.next();
primaryKey.replaceRow(row);

boolean previousFetched = false;
InternalRow previous = null;
if (sequenceFieldEnabled) {
previous = tableState.get(primaryKey);
previousFetched = true;
// only update the kv when the new value's sequence number is higher.
if (previous != null
&& previous.getLong(sequenceIndex) > row.getLong(sequenceIndex)) {
continue;
}
}

if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
InternalRow previous = tableState.get(primaryKey);
if (!previousFetched) {
previous = tableState.get(primaryKey);
}
if (previous != null) {
indexState.retract(secKeyRow.replaceRow(previous), primaryKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.options.Options;
Expand All @@ -28,11 +30,14 @@
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.KeyValueTableRead;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.TypeUtils;

import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
Expand All @@ -58,17 +63,21 @@ public class TableStreamingReader {
@Nullable private final PredicateFilter recordFilter;
private final StreamTableScan scan;

private final boolean sequenceFieldEnabled;

public TableStreamingReader(Table table, int[] projection, @Nullable Predicate predicate) {
this.table = table;
this.projection = projection;
if (CoreOptions.fromMap(table.options()).startupMode()
!= CoreOptions.StartupMode.COMPACTED_FULL) {
CoreOptions options = CoreOptions.fromMap(table.options());
if (options.startupMode() != CoreOptions.StartupMode.COMPACTED_FULL) {
table =
table.copy(
Collections.singletonMap(
CoreOptions.SCAN_MODE.key(),
CoreOptions.StartupMode.LATEST_FULL.toString()));
}
this.sequenceFieldEnabled =
table.primaryKeys().size() > 0 && options.sequenceField().isPresent();

this.readBuilder = table.newReadBuilder().withProjection(projection).withFilter(predicate);
scan = readBuilder.newStreamScan();
Expand Down Expand Up @@ -110,21 +119,27 @@ public RecordReader<InternalRow> nextBatch(boolean useParallelism) throws Except

private RecordReader<InternalRow> read(TableScan.Plan plan, boolean useParallelism)
throws IOException {
CoreOptions options = CoreOptions.fromMap(table.options());
FunctionWithException<Split, RecordReader<InternalRow>, IOException> readerSupplier =
sequenceFieldEnabled ? createKvReaderSupplier() : createReaderSupplier();

RowType rowType = TypeUtils.project(table.rowType(), projection);
// append sequence number at the last column.
rowType = PrimaryKeyLookupTable.appendSequenceNumber(rowType);

RecordReader<InternalRow> reader;
if (useParallelism) {
CoreOptions options = CoreOptions.fromMap(table.options());
reader =
SplitsParallelReadUtil.parallelExecute(
TypeUtils.project(table.rowType(), projection),
readBuilder,
rowType,
readerSupplier,
plan.splits(),
options.pageSize(),
new Options(table.options()).get(LOOKUP_BOOTSTRAP_PARALLELISM));
} else {
TableRead read = readBuilder.newRead();
List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new ArrayList<>();
for (Split split : plan.splits()) {
readers.add(() -> read.createReader(split));
readers.add(() -> readerSupplier.apply(split));
}
reader = ConcatRecordReader.create(readers);
}
Expand All @@ -134,4 +149,35 @@ private RecordReader<InternalRow> read(TableScan.Plan plan, boolean useParalleli
}
return reader;
}

private FunctionWithException<Split, RecordReader<InternalRow>, IOException>
createKvReaderSupplier() {
KeyValueTableRead read = (KeyValueTableRead) readBuilder.newRead();
return split -> {
JoinedRow reused = new JoinedRow();
return read.kvReader(split)
.transform(
kv -> {
reused.replace(kv.value(), GenericRow.of(kv.sequenceNumber()));
reused.setRowKind(kv.valueKind());
return reused;
});
};
}

private FunctionWithException<Split, RecordReader<InternalRow>, IOException>
createReaderSupplier() {
TableRead read = readBuilder.newRead();

return split -> {
JoinedRow reused = new JoinedRow();
return read.createReader(split)
.transform(
row -> {
reused.replace(row, GenericRow.of(-1L));
reused.setRowKind(row.getRowKind());
return reused;
});
};
}
}
Loading

0 comments on commit ddd717e

Please sign in to comment.