From 5d94c13e3f982b391e0ef49b8b0fd8a31687c64d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= Date: Wed, 13 Sep 2023 12:01:04 +0800 Subject: [PATCH] [core] fix totalRecordCount incorrect when compact in primary key table #2009 --- 2009.patch | 186 ++++++++++++++++++ .../main/java/org/apache/paimon/Snapshot.java | 15 ++ .../paimon/operation/FileStoreCommitImpl.java | 8 +- .../table/IncrementalTimeStampTableTest.java | 96 +++++++++ 4 files changed, 303 insertions(+), 2 deletions(-) create mode 100644 2009.patch diff --git a/2009.patch b/2009.patch new file mode 100644 index 000000000000..312b190226d6 --- /dev/null +++ b/2009.patch @@ -0,0 +1,186 @@ +From 3440d2220a7f11fd77101772d33c72769cf0acf9 Mon Sep 17 00:00:00 2001 +From: "daowu.hzy" +Date: Wed, 13 Sep 2023 23:38:41 +0800 +Subject: [PATCH] [core] fix totalRecordCount error when compact in primary key + table + +--- + .../main/java/org/apache/paimon/Snapshot.java | 15 +++ + .../paimon/operation/FileStoreCommitImpl.java | 8 +- + .../table/IncrementalTimeStampTableTest.java | 96 +++++++++++++++++++ + 3 files changed, 117 insertions(+), 2 deletions(-) + +diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +index 15ebf8e1d89..0bbeda7d1a7 100644 +--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java ++++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +@@ -20,6 +20,7 @@ + + import org.apache.paimon.fs.FileIO; + import org.apache.paimon.fs.Path; ++import org.apache.paimon.manifest.FileKind; + import org.apache.paimon.manifest.ManifestEntry; + import org.apache.paimon.manifest.ManifestFileMeta; + import org.apache.paimon.manifest.ManifestList; +@@ -389,6 +390,20 @@ public static long recordCount(List manifestEntries) { + return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum(); + } + ++ public static long recordCountAdd(List manifestEntries) { ++ return manifestEntries.stream() ++ .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind())) ++ .mapToLong(manifest -> manifest.file().rowCount()) ++ .sum(); ++ } ++ ++ public static long recordCountDelete(List manifestEntries) { ++ return manifestEntries.stream() ++ .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind())) ++ .mapToLong(manifest -> manifest.file().rowCount()) ++ .sum(); ++ } ++ + public String toJson() { + return JsonSerdeUtil.toJson(this); + } +diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +index 59dee53588e..45d943ee475 100644 +--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java ++++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +@@ -640,8 +640,12 @@ public boolean tryCommitOnce( + partitionType)); + previousChangesListName = manifestList.write(newMetas); + ++ // the added records subtract the deleted records from ++ long deltaRecordCount = ++ Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles); ++ long totalRecordCount = previousTotalRecordCount + deltaRecordCount; ++ + // write new changes into manifest files +- long deltaRecordCount = Snapshot.recordCount(tableFiles); + List newChangesManifests = manifestFile.write(tableFiles); + newMetas.addAll(newChangesManifests); + newChangesListName = manifestList.write(newChangesManifests); +@@ -672,7 +676,7 @@ public boolean tryCommitOnce( + commitKind, + System.currentTimeMillis(), + logOffsets, +- previousTotalRecordCount + deltaRecordCount, ++ totalRecordCount, + deltaRecordCount, + Snapshot.recordCount(changelogFiles), + currentWatermark); +diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java +index dd348e5bc82..629a74a4499 100644 +--- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java ++++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java +@@ -19,6 +19,7 @@ + package org.apache.paimon.table; + + import org.apache.paimon.CoreOptions; ++import org.apache.paimon.Snapshot; + import org.apache.paimon.catalog.Identifier; + import org.apache.paimon.data.GenericRow; + import org.apache.paimon.data.InternalRow; +@@ -132,6 +133,101 @@ public void testPrimaryKeyTable() throws Exception { + GenericRow.of(2, 2, 1)); + } + ++ @Test ++ public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws Exception { ++ Identifier identifier = identifier("T"); ++ Schema schema = ++ Schema.newBuilder() ++ .column("pt", DataTypes.INT()) ++ .column("pk", DataTypes.INT()) ++ .column("col1", DataTypes.INT()) ++ .partitionKeys("pt") ++ .primaryKey("pk", "pt") ++ .build(); ++ catalog.createTable(identifier, schema, true); ++ Table table = catalog.getTable(identifier); ++ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T")); ++ SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath); ++ ++ // snapshot 1: append ++ write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1)); ++ Snapshot snapshot1 = snapshotManager.snapshot(1); ++ assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount()); ++ assertThat(snapshot1.totalRecordCount()).isEqualTo(3L); ++ assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L); ++ // snapshot 2: append ++ write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1)); ++ Snapshot snapshot2 = snapshotManager.snapshot(2); ++ assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount()); ++ assertThat(snapshot2.totalRecordCount()).isEqualTo(6L); ++ assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L); ++ // snapshot 3: compact ++ compact(table, row(1), 0); ++ Snapshot snapshot3 = snapshotManager.snapshot(3); ++ assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount()); ++ assertThat(snapshot3.totalRecordCount()).isEqualTo(4L); ++ assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L); ++ System.out.println(snapshot3); ++ } ++ ++ @Test ++ public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws Exception { ++ Identifier identifier = identifier("T"); ++ Schema schema = ++ Schema.newBuilder() ++ .column("pt", DataTypes.INT()) ++ .column("pk", DataTypes.INT()) ++ .column("col1", DataTypes.INT()) ++ .partitionKeys("pt") ++ .primaryKey("pk", "pt") ++ .build(); ++ catalog.createTable(identifier, schema, true); ++ Table table = catalog.getTable(identifier); ++ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T")); ++ SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath); ++ ++ // snapshot 1: append ++ write( ++ table, ++ GenericRow.of(1, 1, 1), ++ GenericRow.of(1, 2, 1), ++ GenericRow.of(1, 3, 1), ++ GenericRow.of(2, 1, 1), ++ GenericRow.of(2, 2, 1)); ++ Snapshot snapshot1 = snapshotManager.snapshot(1); ++ assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount()); ++ assertThat(snapshot1.totalRecordCount()).isEqualTo(5L); ++ assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L); ++ // snapshot 2: append ++ write( ++ table, ++ GenericRow.of(1, 1, 2), ++ GenericRow.of(1, 2, 2), ++ GenericRow.of(1, 4, 1), ++ GenericRow.of(2, 2, 2), ++ GenericRow.of(2, 3, 1)); ++ Snapshot snapshot2 = snapshotManager.snapshot(2); ++ assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount()); ++ assertThat(snapshot2.totalRecordCount()).isEqualTo(10L); ++ assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L); ++ // snapshot 3: compact ++ compact(table, row(1), 0); ++ ++ Snapshot snapshot3 = snapshotManager.snapshot(3); ++ ++ assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount()); ++ assertThat(snapshot3.totalRecordCount()).isEqualTo(8L); ++ assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L); ++ // snapshot 4: compact ++ compact(table, row(2), 0); ++ ++ Snapshot snapshot4 = snapshotManager.snapshot(4); ++ ++ assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount()); ++ assertThat(snapshot4.totalRecordCount()).isEqualTo(7L); ++ assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L); ++ } ++ + @Test + public void testAppendTable() throws Exception { + Identifier identifier = identifier("T"); diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java index 15ebf8e1d898..0bbeda7d1a7d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java +++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java @@ -20,6 +20,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; @@ -389,6 +390,20 @@ public static long recordCount(List manifestEntries) { return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum(); } + public static long recordCountAdd(List manifestEntries) { + return manifestEntries.stream() + .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind())) + .mapToLong(manifest -> manifest.file().rowCount()) + .sum(); + } + + public static long recordCountDelete(List manifestEntries) { + return manifestEntries.stream() + .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind())) + .mapToLong(manifest -> manifest.file().rowCount()) + .sum(); + } + public String toJson() { return JsonSerdeUtil.toJson(this); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 59dee53588e3..45d943ee475d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -640,8 +640,12 @@ public boolean tryCommitOnce( partitionType)); previousChangesListName = manifestList.write(newMetas); + // the added records subtract the deleted records from + long deltaRecordCount = + Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles); + long totalRecordCount = previousTotalRecordCount + deltaRecordCount; + // write new changes into manifest files - long deltaRecordCount = Snapshot.recordCount(tableFiles); List newChangesManifests = manifestFile.write(tableFiles); newMetas.addAll(newChangesManifests); newChangesListName = manifestList.write(newChangesManifests); @@ -672,7 +676,7 @@ public boolean tryCommitOnce( commitKind, System.currentTimeMillis(), logOffsets, - previousTotalRecordCount + deltaRecordCount, + totalRecordCount, deltaRecordCount, Snapshot.recordCount(changelogFiles), currentWatermark); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java index dd348e5bc82c..629a74a44990 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -132,6 +133,101 @@ public void testPrimaryKeyTable() throws Exception { GenericRow.of(2, 2, 1)); } + @Test + public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws Exception { + Identifier identifier = identifier("T"); + Schema schema = + Schema.newBuilder() + .column("pt", DataTypes.INT()) + .column("pk", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .build(); + catalog.createTable(identifier, schema, true); + Table table = catalog.getTable(identifier); + Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T")); + SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath); + + // snapshot 1: append + write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1)); + Snapshot snapshot1 = snapshotManager.snapshot(1); + assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount()); + assertThat(snapshot1.totalRecordCount()).isEqualTo(3L); + assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L); + // snapshot 2: append + write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1)); + Snapshot snapshot2 = snapshotManager.snapshot(2); + assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount()); + assertThat(snapshot2.totalRecordCount()).isEqualTo(6L); + assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L); + // snapshot 3: compact + compact(table, row(1), 0); + Snapshot snapshot3 = snapshotManager.snapshot(3); + assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount()); + assertThat(snapshot3.totalRecordCount()).isEqualTo(4L); + assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L); + System.out.println(snapshot3); + } + + @Test + public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws Exception { + Identifier identifier = identifier("T"); + Schema schema = + Schema.newBuilder() + .column("pt", DataTypes.INT()) + .column("pk", DataTypes.INT()) + .column("col1", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .build(); + catalog.createTable(identifier, schema, true); + Table table = catalog.getTable(identifier); + Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T")); + SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath); + + // snapshot 1: append + write( + table, + GenericRow.of(1, 1, 1), + GenericRow.of(1, 2, 1), + GenericRow.of(1, 3, 1), + GenericRow.of(2, 1, 1), + GenericRow.of(2, 2, 1)); + Snapshot snapshot1 = snapshotManager.snapshot(1); + assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount()); + assertThat(snapshot1.totalRecordCount()).isEqualTo(5L); + assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L); + // snapshot 2: append + write( + table, + GenericRow.of(1, 1, 2), + GenericRow.of(1, 2, 2), + GenericRow.of(1, 4, 1), + GenericRow.of(2, 2, 2), + GenericRow.of(2, 3, 1)); + Snapshot snapshot2 = snapshotManager.snapshot(2); + assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount()); + assertThat(snapshot2.totalRecordCount()).isEqualTo(10L); + assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L); + // snapshot 3: compact + compact(table, row(1), 0); + + Snapshot snapshot3 = snapshotManager.snapshot(3); + + assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount()); + assertThat(snapshot3.totalRecordCount()).isEqualTo(8L); + assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L); + // snapshot 4: compact + compact(table, row(2), 0); + + Snapshot snapshot4 = snapshotManager.snapshot(4); + + assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount()); + assertThat(snapshot4.totalRecordCount()).isEqualTo(7L); + assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L); + } + @Test public void testAppendTable() throws Exception { Identifier identifier = identifier("T");