Skip to content

Commit

Permalink
[core] fix totalRecordCount error when compact in primary key table (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alibaba-HZY authored Sep 14, 2023
1 parent e5c8e2d commit 18529b8
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 2 deletions.
15 changes: 15 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
Expand Down Expand Up @@ -389,6 +390,20 @@ public static long recordCount(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum();
}

public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}

public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,12 @@ public boolean tryCommitOnce(
partitionType));
previousChangesListName = manifestList.write(newMetas);

// the added records subtract the deleted records from
long deltaRecordCount =
Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;

// write new changes into manifest files
long deltaRecordCount = Snapshot.recordCount(tableFiles);
List<ManifestFileMeta> newChangesManifests = manifestFile.write(tableFiles);
newMetas.addAll(newChangesManifests);
newChangesListName = manifestList.write(newChangesManifests);
Expand Down Expand Up @@ -672,7 +676,7 @@ public boolean tryCommitOnce(
commitKind,
System.currentTimeMillis(),
logOffsets,
previousTotalRecordCount + deltaRecordCount,
totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
currentWatermark);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -132,6 +133,101 @@ public void testPrimaryKeyTable() throws Exception {
GenericRow.of(2, 2, 1));
}

@Test
public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws Exception {
Identifier identifier = identifier("T");
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.build();
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T"));
SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath);

// snapshot 1: append
write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1));
Snapshot snapshot1 = snapshotManager.snapshot(1);
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
assertThat(snapshot1.totalRecordCount()).isEqualTo(3L);
assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L);
// snapshot 2: append
write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1));
Snapshot snapshot2 = snapshotManager.snapshot(2);
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
assertThat(snapshot2.totalRecordCount()).isEqualTo(6L);
assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L);
// snapshot 3: compact
compact(table, row(1), 0);
Snapshot snapshot3 = snapshotManager.snapshot(3);
assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
assertThat(snapshot3.totalRecordCount()).isEqualTo(4L);
assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
System.out.println(snapshot3);
}

@Test
public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws Exception {
Identifier identifier = identifier("T");
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.build();
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T"));
SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath);

// snapshot 1: append
write(
table,
GenericRow.of(1, 1, 1),
GenericRow.of(1, 2, 1),
GenericRow.of(1, 3, 1),
GenericRow.of(2, 1, 1),
GenericRow.of(2, 2, 1));
Snapshot snapshot1 = snapshotManager.snapshot(1);
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
assertThat(snapshot1.totalRecordCount()).isEqualTo(5L);
assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L);
// snapshot 2: append
write(
table,
GenericRow.of(1, 1, 2),
GenericRow.of(1, 2, 2),
GenericRow.of(1, 4, 1),
GenericRow.of(2, 2, 2),
GenericRow.of(2, 3, 1));
Snapshot snapshot2 = snapshotManager.snapshot(2);
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
assertThat(snapshot2.totalRecordCount()).isEqualTo(10L);
assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L);
// snapshot 3: compact
compact(table, row(1), 0);

Snapshot snapshot3 = snapshotManager.snapshot(3);

assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
assertThat(snapshot3.totalRecordCount()).isEqualTo(8L);
assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
// snapshot 4: compact
compact(table, row(2), 0);

Snapshot snapshot4 = snapshotManager.snapshot(4);

assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount());
assertThat(snapshot4.totalRecordCount()).isEqualTo(7L);
assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L);
}

@Test
public void testAppendTable() throws Exception {
Identifier identifier = identifier("T");
Expand Down

0 comments on commit 18529b8

Please sign in to comment.