Skip to content

Commit

Permalink
[core] Partitions system table support time travel (#4511)
Browse files Browse the repository at this point in the history
  • Loading branch information
zddr authored Nov 13, 2024
1 parent 30bf503 commit c7dfcfa
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}

List<PartitionEntry> partitions = fileStoreTable.newSnapshotReader().partitionEntries();
List<PartitionEntry> partitions = fileStoreTable.newScan().listPartitionEntries();

RowDataToObjectArrayConverter converter =
new RowDataToObjectArrayConverter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void before() throws Exception {
partitionsTable = (PartitionsTable) catalog.getTable(filesTableId);

// snapshot 1: append
write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 5));
write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 3, 5));

write(table, GenericRow.of(1, 1, 3), GenericRow.of(1, 2, 4));
}
Expand All @@ -85,19 +86,36 @@ public void before() throws Exception {
public void testPartitionRecordCount() throws Exception {
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 2L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 2L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

@Test
public void testPartitionTimeTravel() throws Exception {
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L));

// Only read partition and record count, record size may not stable.
List<InternalRow> result =
read(
partitionsTable.copy(
Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), "1")),
new int[][] {{0}, {1}});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

@Test
public void testPartitionValue() throws Exception {
write(table, GenericRow.of(2, 1, 3), GenericRow.of(3, 1, 4));
List<InternalRow> expectedRow = new ArrayList<>();
expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 4L, 3L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 2L, 2L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 1L, 1L));
expectedRow.add(GenericRow.of(BinaryString.fromString("[3]"), 1L, 1L));

List<InternalRow> result = read(partitionsTable, new int[][] {{0}, {1}, {3}});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
Expand Down

0 comments on commit c7dfcfa

Please sign in to comment.