Skip to content

Commit

Permalink
[flink] Fix re-open for FullCacheLookupTable implementations (#3149)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Apr 2, 2024
1 parent e2eda7a commit 018f3a6
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,17 @@
public abstract class FullCacheLookupTable implements LookupTable {

protected final Context context;
protected final RocksDBStateFactory stateFactory;
protected final RowType projectedType;

@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final int appendUdsFieldNumber;

protected RocksDBStateFactory stateFactory;
private LookupStreamingReader reader;
private Predicate specificPartition;

public FullCacheLookupTable(Context context) throws IOException {
public FullCacheLookupTable(Context context) {
this.context = context;
this.stateFactory =
new RocksDBStateFactory(
context.tempPath.toString(),
context.table.coreOptions().toConfiguration(),
null);
FileStoreTable table = context.table;
List<String> sequenceFields = new ArrayList<>();
if (table.primaryKeys().size() > 0) {
Expand Down Expand Up @@ -104,8 +99,15 @@ public void specificPartitionFilter(Predicate filter) {
this.specificPartition = filter;
}

@Override
public void open() throws Exception {
protected void openStateFactory() throws Exception {
this.stateFactory =
new RocksDBStateFactory(
context.tempPath.toString(),
context.table.coreOptions().toConfiguration(),
null);
}

protected void bootstrap() throws Exception {
Predicate scanPredicate =
PredicateBuilder.andNullable(context.tablePredicate, specificPartition);
this.reader = new LookupStreamingReader(context.table, context.projection, scanPredicate);
Expand Down Expand Up @@ -198,7 +200,7 @@ public interface TableBulkLoader {
void finish() throws IOException;
}

static FullCacheLookupTable create(Context context, long lruCacheSize) throws IOException {
static FullCacheLookupTable create(Context context, long lruCacheSize) {
List<String> primaryKeys = context.table.primaryKeys();
if (primaryKeys.isEmpty()) {
return new NoPrimaryKeyLookupTable(context, lruCacheSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class NoPrimaryKeyLookupTable extends FullCacheLookupTable {

private RocksDBListState<InternalRow, InternalRow> state;

public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws IOException {
public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) {
super(context);
this.lruCacheSize = lruCacheSize;
List<String> fieldNames = projectedType.getFieldNames();
Expand All @@ -52,14 +52,15 @@ public NoPrimaryKeyLookupTable(Context context, long lruCacheSize) throws IOExce

@Override
public void open() throws Exception {
openStateFactory();
this.state =
stateFactory.listState(
"join-key-index",
InternalSerializers.create(
TypeUtils.project(projectedType, joinKeyRow.indexMapping())),
InternalSerializers.create(projectedType),
lruCacheSize);
super.open();
bootstrap();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public class PrimaryKeyLookupTable extends FullCacheLookupTable {

protected RocksDBValueState<InternalRow, InternalRow> tableState;

public PrimaryKeyLookupTable(Context context, long lruCacheSize, List<String> joinKey)
throws IOException {
public PrimaryKeyLookupTable(Context context, long lruCacheSize, List<String> joinKey) {
super(context);
this.lruCacheSize = lruCacheSize;
List<String> fieldNames = projectedType.getFieldNames();
Expand All @@ -71,14 +70,19 @@ public PrimaryKeyLookupTable(Context context, long lruCacheSize, List<String> jo

@Override
public void open() throws Exception {
openStateFactory();
createTableState();
bootstrap();
}

protected void createTableState() throws IOException {
this.tableState =
stateFactory.valueState(
"table",
InternalSerializers.create(
TypeUtils.project(projectedType, primaryKeyRow.indexMapping())),
InternalSerializers.create(projectedType),
lruCacheSize);
super.open();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class SecondaryIndexLookupTable extends PrimaryKeyLookupTable {

private RocksDBSetState<InternalRow, InternalRow> indexState;

public SecondaryIndexLookupTable(Context context, long lruCacheSize) throws IOException {
public SecondaryIndexLookupTable(Context context, long lruCacheSize) {
super(context, lruCacheSize / 2, context.table.primaryKeys());
List<String> fieldNames = projectedType.getFieldNames();
int[] secKeyMapping = context.joinKey.stream().mapToInt(fieldNames::indexOf).toArray();
Expand All @@ -47,6 +47,8 @@ public SecondaryIndexLookupTable(Context context, long lruCacheSize) throws IOEx

@Override
public void open() throws Exception {
openStateFactory();
createTableState();
this.indexState =
stateFactory.setState(
"sec-index",
Expand All @@ -55,7 +57,7 @@ public void open() throws Exception {
InternalSerializers.create(
TypeUtils.project(projectedType, primaryKeyRow.indexMapping())),
lruCacheSize);
super.open();
bootstrap();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ public void testPkTable() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

// test bulk load error
{
TableBulkLoader bulkLoader = table.createBulkLoader();
Expand Down Expand Up @@ -175,6 +179,10 @@ public void testPkTableWithSequenceField() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

List<Pair<byte[], byte[]>> records = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
InternalRow row = row(i, 11 * i, 111 * i);
Expand Down Expand Up @@ -219,6 +227,10 @@ public void testPkTablePkFilter() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
Expand Down Expand Up @@ -250,6 +262,10 @@ public void testPkTableNonPkFilter() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(1));
assertThat(result).hasSize(1);
Expand All @@ -274,6 +290,10 @@ public void testSecKeyTable() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
Random rnd = new Random();
Expand Down Expand Up @@ -319,6 +339,10 @@ public void testSecKeyTableWithSequenceField() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

List<Pair<byte[], byte[]>> records = new ArrayList<>();
Random rnd = new Random();
Map<Integer, Set<Integer>> secKeyToPk = new HashMap<>();
Expand Down Expand Up @@ -367,6 +391,10 @@ public void testSecKeyTablePkFilter() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

table.refresh(singletonList(row(1, 11, 111)).iterator());
List<InternalRow> result = table.get(row(11));
assertThat(result).hasSize(1);
Expand Down Expand Up @@ -407,6 +435,10 @@ public void testNoPrimaryKeyTable() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

// test bulk load 100_000 records
List<Pair<byte[], byte[]>> records = new ArrayList<>();
Random rnd = new Random();
Expand Down Expand Up @@ -450,6 +482,10 @@ public void testNoPrimaryKeyTableFilter() throws Exception {
table = FullCacheLookupTable.create(context, ThreadLocalRandom.current().nextInt(2) * 10);
table.open();

// test re-open
table.close();
table.open();

table.refresh(singletonList(row(1, 11, 333)).iterator());
List<InternalRow> result = table.get(row(11));
assertThat(result).hasSize(0);
Expand Down Expand Up @@ -508,6 +544,11 @@ public void testPartialLookupTableWithProjection() throws Exception {
tempDir.toFile(),
ImmutableList.of("pk1", "pk2"));
table.open();

// test re-open
table.close();
table.open();

List<InternalRow> result = table.get(row(1, -1));
assertThat(result).hasSize(0);

Expand Down Expand Up @@ -535,6 +576,10 @@ public void testPartialLookupTableJoinKeyOrder() throws Exception {
ImmutableList.of("pk2", "pk1"));
table.open();

// test re-open
table.close();
table.open();

List<InternalRow> result = table.get(row(-1, 1));
assertThat(result).hasSize(0);

Expand Down

0 comments on commit 018f3a6

Please sign in to comment.