diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 15ebf8e1d898..0bbeda7d1a7d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -20,6 +20,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; @@ -389,6 +390,20 @@ public static long recordCount(List manifestEntries) { return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum(); } + public static long recordCountAdd(List manifestEntries) { + return manifestEntries.stream() + .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind())) + .mapToLong(manifest -> manifest.file().rowCount()) + .sum(); + } + + public static long recordCountDelete(List manifestEntries) { + return manifestEntries.stream() + .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind())) + .mapToLong(manifest -> manifest.file().rowCount()) + .sum(); + } + public String toJson() { return JsonSerdeUtil.toJson(this); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 59dee53588e3..45d943ee475d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -640,8 +640,12 @@ public boolean tryCommitOnce( partitionType)); previousChangesListName = manifestList.write(newMetas); + // the added records subtract the deleted records from + long deltaRecordCount = + Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles); + long totalRecordCount = previousTotalRecordCount + deltaRecordCount; + // write new changes into manifest files - long deltaRecordCount = Snapshot.recordCount(tableFiles); List newChangesManifests = manifestFile.write(tableFiles); newMetas.addAll(newChangesManifests); newChangesListName = manifestList.write(newChangesManifests); @@ -672,7 +676,7 @@ public boolean tryCommitOnce( commitKind, System.currentTimeMillis(), logOffsets, - previousTotalRecordCount + deltaRecordCount, + totalRecordCount, deltaRecordCount, Snapshot.recordCount(changelogFiles), currentWatermark); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java index dd348e5bc82c..629a74a44990 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -132,6 +133,101 @@ public void testPrimaryKeyTable() throws Exception { GenericRow.of(2, 2, 1)); } + @Test + public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws Exception { + Identifier identifier = identifier("T"); + Schema schema = + Schema.newBuilder() + .column("pt", DataTypes.INT()) + .column("pk", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .build(); + catalog.createTable(identifier, schema, true); + Table table = catalog.getTable(identifier); + Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T")); + SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath); + + // snapshot 1: append + write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1)); + Snapshot snapshot1 = snapshotManager.snapshot(1); + assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount()); + assertThat(snapshot1.totalRecordCount()).isEqualTo(3L); + assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L); + // snapshot 2: append + write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1)); + Snapshot snapshot2 = snapshotManager.snapshot(2); + assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount()); + assertThat(snapshot2.totalRecordCount()).isEqualTo(6L); + assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L); + // snapshot 3: compact + compact(table, row(1), 0); + Snapshot snapshot3 = snapshotManager.snapshot(3); + assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount()); + assertThat(snapshot3.totalRecordCount()).isEqualTo(4L); + assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L); + System.out.println(snapshot3); + } + + @Test + public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws Exception { + Identifier identifier = identifier("T"); + Schema schema = + Schema.newBuilder() + .column("pt", DataTypes.INT()) + .column("pk", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .build(); + catalog.createTable(identifier, schema, true); + Table table = catalog.getTable(identifier); + Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T")); + SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath); + + // snapshot 1: append + write( + table, + GenericRow.of(1, 1, 1), + GenericRow.of(1, 2, 1), + GenericRow.of(1, 3, 1), + GenericRow.of(2, 1, 1), + GenericRow.of(2, 2, 1)); + Snapshot snapshot1 = snapshotManager.snapshot(1); + assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount()); + assertThat(snapshot1.totalRecordCount()).isEqualTo(5L); + assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L); + // snapshot 2: append + write( + table, + GenericRow.of(1, 1, 2), + GenericRow.of(1, 2, 2), + GenericRow.of(1, 4, 1), + GenericRow.of(2, 2, 2), + GenericRow.of(2, 3, 1)); + Snapshot snapshot2 = snapshotManager.snapshot(2); + assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount()); + assertThat(snapshot2.totalRecordCount()).isEqualTo(10L); + assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L); + // snapshot 3: compact + compact(table, row(1), 0); + + Snapshot snapshot3 = snapshotManager.snapshot(3); + + assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount()); + assertThat(snapshot3.totalRecordCount()).isEqualTo(8L); + assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L); + // snapshot 4: compact + compact(table, row(2), 0); + + Snapshot snapshot4 = snapshotManager.snapshot(4); + + assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount()); + assertThat(snapshot4.totalRecordCount()).isEqualTo(7L); + assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L); + } + @Test public void testAppendTable() throws Exception { Identifier identifier = identifier("T");