Skip to content

Commit

Permalink
[core] fix totalRecordCount incorrect when compact in primary key table
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Sep 13, 2023
1 parent 4f1fdbf commit 5d94c13
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 2 deletions.
186 changes: 186 additions & 0 deletions 2009.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
From 3440d2220a7f11fd77101772d33c72769cf0acf9 Mon Sep 17 00:00:00 2001
From: "daowu.hzy" <[email protected]>
Date: Wed, 13 Sep 2023 23:38:41 +0800
Subject: [PATCH] [core] fix totalRecordCount error when compact in primary key
table

---
.../main/java/org/apache/paimon/Snapshot.java | 15 +++
.../paimon/operation/FileStoreCommitImpl.java | 8 +-
.../table/IncrementalTimeStampTableTest.java | 96 +++++++++++++++++++
3 files changed, 117 insertions(+), 2 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 15ebf8e1d89..0bbeda7d1a7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -20,6 +20,7 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
@@ -389,6 +390,20 @@ public static long recordCount(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum();
}

+ public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
+ return manifestEntries.stream()
+ .filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind()))
+ .mapToLong(manifest -> manifest.file().rowCount())
+ .sum();
+ }
+
+ public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
+ return manifestEntries.stream()
+ .filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind()))
+ .mapToLong(manifest -> manifest.file().rowCount())
+ .sum();
+ }
+
public String toJson() {
return JsonSerdeUtil.toJson(this);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 59dee53588e..45d943ee475 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -640,8 +640,12 @@ public boolean tryCommitOnce(
partitionType));
previousChangesListName = manifestList.write(newMetas);

+ // the added records subtract the deleted records from
+ long deltaRecordCount =
+ Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles);
+ long totalRecordCount = previousTotalRecordCount + deltaRecordCount;
+
// write new changes into manifest files
- long deltaRecordCount = Snapshot.recordCount(tableFiles);
List<ManifestFileMeta> newChangesManifests = manifestFile.write(tableFiles);
newMetas.addAll(newChangesManifests);
newChangesListName = manifestList.write(newChangesManifests);
@@ -672,7 +676,7 @@ public boolean tryCommitOnce(
commitKind,
System.currentTimeMillis(),
logOffsets,
- previousTotalRecordCount + deltaRecordCount,
+ totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
currentWatermark);
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
index dd348e5bc82..629a74a4499 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTimeStampTableTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -132,6 +133,101 @@ public void testPrimaryKeyTable() throws Exception {
GenericRow.of(2, 2, 1));
}

+ @Test
+ public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T"));
+ SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath);
+
+ // snapshot 1: append
+ write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1));
+ Snapshot snapshot1 = snapshotManager.snapshot(1);
+ assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
+ assertThat(snapshot1.totalRecordCount()).isEqualTo(3L);
+ assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L);
+ // snapshot 2: append
+ write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1));
+ Snapshot snapshot2 = snapshotManager.snapshot(2);
+ assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
+ assertThat(snapshot2.totalRecordCount()).isEqualTo(6L);
+ assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L);
+ // snapshot 3: compact
+ compact(table, row(1), 0);
+ Snapshot snapshot3 = snapshotManager.snapshot(3);
+ assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
+ assertThat(snapshot3.totalRecordCount()).isEqualTo(4L);
+ assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
+ System.out.println(snapshot3);
+ }
+
+ @Test
+ public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pt", DataTypes.INT())
+ .column("pk", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T"));
+ SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath);
+
+ // snapshot 1: append
+ write(
+ table,
+ GenericRow.of(1, 1, 1),
+ GenericRow.of(1, 2, 1),
+ GenericRow.of(1, 3, 1),
+ GenericRow.of(2, 1, 1),
+ GenericRow.of(2, 2, 1));
+ Snapshot snapshot1 = snapshotManager.snapshot(1);
+ assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
+ assertThat(snapshot1.totalRecordCount()).isEqualTo(5L);
+ assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L);
+ // snapshot 2: append
+ write(
+ table,
+ GenericRow.of(1, 1, 2),
+ GenericRow.of(1, 2, 2),
+ GenericRow.of(1, 4, 1),
+ GenericRow.of(2, 2, 2),
+ GenericRow.of(2, 3, 1));
+ Snapshot snapshot2 = snapshotManager.snapshot(2);
+ assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
+ assertThat(snapshot2.totalRecordCount()).isEqualTo(10L);
+ assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L);
+ // snapshot 3: compact
+ compact(table, row(1), 0);
+
+ Snapshot snapshot3 = snapshotManager.snapshot(3);
+
+ assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
+ assertThat(snapshot3.totalRecordCount()).isEqualTo(8L);
+ assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
+ // snapshot 4: compact
+ compact(table, row(2), 0);
+
+ Snapshot snapshot4 = snapshotManager.snapshot(4);
+
+ assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount());
+ assertThat(snapshot4.totalRecordCount()).isEqualTo(7L);
+ assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L);
+ }
+
@Test
public void testAppendTable() throws Exception {
Identifier identifier = identifier("T");
15 changes: 15 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
Expand Down Expand Up @@ -389,6 +390,20 @@ public static long recordCount(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream().mapToLong(manifest -> manifest.file().rowCount()).sum();
}

public static long recordCountAdd(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.ADD.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}

public static long recordCountDelete(List<ManifestEntry> manifestEntries) {
return manifestEntries.stream()
.filter(manifestEntry -> FileKind.DELETE.equals(manifestEntry.kind()))
.mapToLong(manifest -> manifest.file().rowCount())
.sum();
}

public String toJson() {
return JsonSerdeUtil.toJson(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,12 @@ public boolean tryCommitOnce(
partitionType));
previousChangesListName = manifestList.write(newMetas);

// the added records subtract the deleted records from
long deltaRecordCount =
Snapshot.recordCountAdd(tableFiles) - Snapshot.recordCountDelete(tableFiles);
long totalRecordCount = previousTotalRecordCount + deltaRecordCount;

// write new changes into manifest files
long deltaRecordCount = Snapshot.recordCount(tableFiles);
List<ManifestFileMeta> newChangesManifests = manifestFile.write(tableFiles);
newMetas.addAll(newChangesManifests);
newChangesListName = manifestList.write(newChangesManifests);
Expand Down Expand Up @@ -672,7 +676,7 @@ public boolean tryCommitOnce(
commitKind,
System.currentTimeMillis(),
logOffsets,
previousTotalRecordCount + deltaRecordCount,
totalRecordCount,
deltaRecordCount,
Snapshot.recordCount(changelogFiles),
currentWatermark);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -132,6 +133,101 @@ public void testPrimaryKeyTable() throws Exception {
GenericRow.of(2, 2, 1));
}

@Test
public void testPrimaryKeyTableTotalRecordCountWithOnePartition() throws Exception {
Identifier identifier = identifier("T");
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.build();
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T"));
SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath);

// snapshot 1: append
write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 1), GenericRow.of(1, 3, 1));
Snapshot snapshot1 = snapshotManager.snapshot(1);
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
assertThat(snapshot1.totalRecordCount()).isEqualTo(3L);
assertThat(snapshot1.deltaRecordCount()).isEqualTo(3L);
// snapshot 2: append
write(table, GenericRow.of(1, 1, 2), GenericRow.of(1, 2, 2), GenericRow.of(1, 4, 1));
Snapshot snapshot2 = snapshotManager.snapshot(2);
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
assertThat(snapshot2.totalRecordCount()).isEqualTo(6L);
assertThat(snapshot2.deltaRecordCount()).isEqualTo(3L);
// snapshot 3: compact
compact(table, row(1), 0);
Snapshot snapshot3 = snapshotManager.snapshot(3);
assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
assertThat(snapshot3.totalRecordCount()).isEqualTo(4L);
assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
System.out.println(snapshot3);
}

@Test
public void testPrimaryKeyTableTotalRecordCountWithMultiPartition() throws Exception {
Identifier identifier = identifier("T");
Schema schema =
Schema.newBuilder()
.column("pt", DataTypes.INT())
.column("pk", DataTypes.INT())
.column("col1", DataTypes.INT())
.partitionKeys("pt")
.primaryKey("pk", "pt")
.build();
catalog.createTable(identifier, schema, true);
Table table = catalog.getTable(identifier);
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, "T"));
SnapshotManager snapshotManager = new SnapshotManager(LocalFileIO.create(), tablePath);

// snapshot 1: append
write(
table,
GenericRow.of(1, 1, 1),
GenericRow.of(1, 2, 1),
GenericRow.of(1, 3, 1),
GenericRow.of(2, 1, 1),
GenericRow.of(2, 2, 1));
Snapshot snapshot1 = snapshotManager.snapshot(1);
assertThat(snapshot1.totalRecordCount()).isEqualTo(snapshot1.deltaRecordCount());
assertThat(snapshot1.totalRecordCount()).isEqualTo(5L);
assertThat(snapshot1.deltaRecordCount()).isEqualTo(5L);
// snapshot 2: append
write(
table,
GenericRow.of(1, 1, 2),
GenericRow.of(1, 2, 2),
GenericRow.of(1, 4, 1),
GenericRow.of(2, 2, 2),
GenericRow.of(2, 3, 1));
Snapshot snapshot2 = snapshotManager.snapshot(2);
assertThat(snapshot2.totalRecordCount()).isGreaterThan(snapshot2.deltaRecordCount());
assertThat(snapshot2.totalRecordCount()).isEqualTo(10L);
assertThat(snapshot2.deltaRecordCount()).isEqualTo(5L);
// snapshot 3: compact
compact(table, row(1), 0);

Snapshot snapshot3 = snapshotManager.snapshot(3);

assertThat(snapshot3.totalRecordCount()).isGreaterThan(snapshot3.deltaRecordCount());
assertThat(snapshot3.totalRecordCount()).isEqualTo(8L);
assertThat(snapshot3.deltaRecordCount()).isEqualTo(-2L);
// snapshot 4: compact
compact(table, row(2), 0);

Snapshot snapshot4 = snapshotManager.snapshot(4);

assertThat(snapshot4.totalRecordCount()).isGreaterThan(snapshot4.deltaRecordCount());
assertThat(snapshot4.totalRecordCount()).isEqualTo(7L);
assertThat(snapshot4.deltaRecordCount()).isEqualTo(-1L);
}

@Test
public void testAppendTable() throws Exception {
Identifier identifier = identifier("T");
Expand Down

0 comments on commit 5d94c13

Please sign in to comment.