diff --git a/docs/content/maintenance/manage-branches.md b/docs/content/maintenance/manage-branches.md index 0cb52b6f2584..4343214f2c28 100644 --- a/docs/content/maintenance/manage-branches.md +++ b/docs/content/maintenance/manage-branches.md @@ -118,6 +118,7 @@ You can read or write with branch as below. ```sql -- read from branch 'branch1' SELECT * FROM `t$branch_branch1`; +SELECT * FROM `t$branch_branch1` /*+ OPTIONS('consumer-id' = 'myid') */; -- write to branch 'branch1' INSERT INTO `t$branch_branch1` SELECT ... diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java index 6a928b81cdb3..093031b065c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java @@ -21,6 +21,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.utils.DateTimeUtils; +import org.apache.paimon.utils.StringUtils; import java.io.IOException; import java.io.Serializable; @@ -33,6 +34,8 @@ import java.util.OptionalLong; import java.util.stream.Collectors; +import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; +import static org.apache.paimon.utils.BranchManager.branchPath; import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles; import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus; @@ -46,9 +49,16 @@ public class ConsumerManager implements Serializable { private final FileIO fileIO; private final Path tablePath; + private final String branch; + public ConsumerManager(FileIO fileIO, Path tablePath) { + this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); + } + + public ConsumerManager(FileIO fileIO, Path tablePath, String branchName) { this.fileIO = fileIO; this.tablePath = tablePath; + this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName; } public Optional consumer(String consumerId) { @@ -119,10 +129,11 @@ public List listAllIds() { } private Path consumerDirectory() { - return new Path(tablePath + "/consumer"); + return new Path(branchPath(tablePath, branch) + "/consumer"); } private Path consumerPath(String consumerId) { - return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX + consumerId); + return new Path( + branchPath(tablePath, branch) + "/consumer/" + CONSUMER_PREFIX + consumerId); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 0171ff677dd5..818b6e87ee3e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -374,7 +374,7 @@ public TableCommitImpl newCommit(String commitUser) { options.writeOnly() ? null : store().newTagCreationManager(), catalogEnvironment.lockFactory().create(), CoreOptions.fromMap(options()).consumerExpireTime(), - new ConsumerManager(fileIO, path), + new ConsumerManager(fileIO, path, snapshotManager().branch()), coreOptions().snapshotExpireExecutionMode(), name(), coreOptions().forceCreatingSnapshot()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index 6d5e76d5b85b..759088a06dfa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -56,7 +56,10 @@ public ExpireChangelogImpl( this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.consumerManager = - new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + new ConsumerManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); this.changelogDeletion = changelogDeletion; this.expireConfig = ExpireConfig.builder().build(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 73fac37e6780..1700472978ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -56,7 +56,10 @@ public ExpireSnapshotsImpl( TagManager tagManager) { this.snapshotManager = snapshotManager; this.consumerManager = - new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + new ConsumerManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); this.snapshotDeletion = snapshotDeletion; this.tagManager = tagManager; this.expireConfig = ExpireConfig.builder().build(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index bd9e48e4cc95..1b58fea9167c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -98,7 +98,10 @@ public SnapshotReaderImpl( this.deletionVectors = options.deletionVectorsEnabled(); this.snapshotManager = snapshotManager; this.consumerManager = - new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + new ConsumerManager( + snapshotManager.fileIO(), + snapshotManager.tablePath(), + snapshotManager.branch()); this.splitGenerator = splitGenerator; this.nonPartitionFilterConsumer = nonPartitionFilterConsumer; this.defaultValueAssigner = defaultValueAssigner; diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index ca88259defef..cf0b44b5b4eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -90,6 +90,10 @@ public Path tablePath() { return tablePath; } + public String branch() { + return branch; + } + public Path changelogDirectory() { return new Path(branchPath(tablePath, branch) + "/changelog"); } diff --git a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java index 9ee320754f22..9ed685a4de90 100644 --- a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java @@ -39,11 +39,18 @@ public class ConsumerManagerTest { private ConsumerManager manager; + private ConsumerManager consumerManagerBranch; + @BeforeEach public void before() { this.manager = new ConsumerManager( LocalFileIO.create(), new org.apache.paimon.fs.Path(tempDir.toUri())); + this.consumerManagerBranch = + new ConsumerManager( + LocalFileIO.create(), + new org.apache.paimon.fs.Path(tempDir.toUri()), + "branch1"); } @Test @@ -62,6 +69,21 @@ public void test() { assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L); assertThat(manager.minNextSnapshot()).isEqualTo(OptionalLong.of(5L)); + + Optional consumerBranch = consumerManagerBranch.consumer("id1"); + assertThat(consumerBranch).isEmpty(); + + assertThat(consumerManagerBranch.minNextSnapshot()).isEmpty(); + + consumerManagerBranch.resetConsumer("id1", new Consumer(5)); + consumerBranch = consumerManagerBranch.consumer("id1"); + assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(5L); + + consumerManagerBranch.resetConsumer("id2", new Consumer(8)); + consumerBranch = consumerManagerBranch.consumer("id2"); + assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(8L); + + assertThat(consumerManagerBranch.minNextSnapshot()).isEqualTo(OptionalLong.of(5L)); } @Test @@ -83,11 +105,41 @@ public void testExpire() throws Exception { manager.resetConsumer("id2", new Consumer(3)); manager.expire(expireDateTime); assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(3L); + + consumerManagerBranch.resetConsumer("id3", new Consumer(1)); + Thread.sleep(1000); + LocalDateTime expireDateTimeBranch = + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()); + Thread.sleep(1000); + consumerManagerBranch.resetConsumer("id4", new Consumer(2)); + + // check expire + consumerManagerBranch.expire(expireDateTimeBranch); + assertThat(consumerManagerBranch.consumer("id3")).isEmpty(); + assertThat(consumerManagerBranch.consumer("id4")) + .map(Consumer::nextSnapshot) + .get() + .isEqualTo(2L); + + // check last modification + expireDateTimeBranch = DateTimeUtils.toLocalDateTime(System.currentTimeMillis()); + Thread.sleep(1000); + consumerManagerBranch.resetConsumer("id4", new Consumer(3)); + consumerManagerBranch.expire(expireDateTimeBranch); + assertThat(consumerManagerBranch.consumer("id4")) + .map(Consumer::nextSnapshot) + .get() + .isEqualTo(3L); } @Test public void testReadConsumer() throws Exception { manager.resetConsumer("id1", new Consumer(5)); assertThat(manager.consumer("id1")); + + consumerManagerBranch.resetConsumer("id2", new Consumer(5)); + assertThat(consumerManagerBranch.consumer("id2")); + + assertThat(manager.consumer("id2")).isEmpty(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java index d4d13cc2eb17..615b448ec9bb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java @@ -50,7 +50,10 @@ public ResetConsumerAction withNextSnapshotIds(Long nextSnapshotId) { public void run() throws Exception { FileStoreTable dataTable = (FileStoreTable) table; ConsumerManager consumerManager = - new ConsumerManager(dataTable.fileIO(), dataTable.location()); + new ConsumerManager( + dataTable.fileIO(), + dataTable.location(), + dataTable.snapshotManager().branch()); if (Objects.isNull(nextSnapshotId)) { consumerManager.deleteConsumer(consumerId); } else { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java index 6ff4df5a1ed7..0355d6dc1cab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java @@ -50,7 +50,10 @@ public String[] call( FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ConsumerManager consumerManager = - new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location()); + new ConsumerManager( + fileStoreTable.fileIO(), + fileStoreTable.location(), + fileStoreTable.snapshotManager().branch()); consumerManager.resetConsumer(consumerId, new Consumer(nextSnapshotId)); return new String[] {"Success"}; @@ -61,7 +64,10 @@ public String[] call(ProcedureContext procedureContext, String tableId, String c FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ConsumerManager consumerManager = - new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location()); + new ConsumerManager( + fileStoreTable.fileIO(), + fileStoreTable.location(), + fileStoreTable.snapshotManager().branch()); consumerManager.deleteConsumer(consumerId); return new String[] {"Success"}; diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index 4818c97e64d2..6a17a4512de7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -119,4 +119,88 @@ public void testResetConsumer() throws Exception { Optional consumer3 = consumerManager.consumer("myid"); assertThat(consumer3).isNotPresent(); } + + @Test + public void testResetBranchConsumer() throws Exception { + init(warehouse); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"pk1", "col1"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk1"), + Collections.emptyList(), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + String branchName = "b1"; + table.createBranch("b1", 3); + String branchTableName = tableName + "$branch_b1"; + + // use consumer streaming read table + testStreamingRead( + "SELECT * FROM `" + + branchTableName + + "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */", + Arrays.asList( + changelogRow("+I", 1L, "Hi"), + changelogRow("+I", 2L, "Hello"), + changelogRow("+I", 3L, "Paimon"))) + .close(); + + ConsumerManager consumerManager = + new ConsumerManager(table.fileIO(), table.location(), branchName); + Optional consumer1 = consumerManager.consumer("myid"); + assertThat(consumer1).isPresent(); + assertThat(consumer1.get().nextSnapshot()).isEqualTo(4); + + List args = + Arrays.asList( + "reset_consumer", + "--warehouse", + warehouse, + "--database", + database, + "--table", + branchTableName, + "--consumer_id", + "myid", + "--next_snapshot", + "1"); + // reset consumer + if (ThreadLocalRandom.current().nextBoolean()) { + createAction(ResetConsumerAction.class, args).run(); + } else { + callProcedure( + String.format( + "CALL sys.reset_consumer('%s.%s', 'myid', 1)", + database, branchTableName)); + } + Optional consumer2 = consumerManager.consumer("myid"); + assertThat(consumer2).isPresent(); + assertThat(consumer2.get().nextSnapshot()).isEqualTo(1); + + // delete consumer + if (ThreadLocalRandom.current().nextBoolean()) { + createAction(ResetConsumerAction.class, args.subList(0, 9)).run(); + } else { + callProcedure( + String.format( + "CALL sys.reset_consumer('%s.%s', 'myid')", database, branchTableName)); + } + Optional consumer3 = consumerManager.consumer("myid"); + assertThat(consumer3).isNotPresent(); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java index aafed90a3dc2..a13227e95dc7 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java @@ -83,7 +83,10 @@ public InternalRow[] call(InternalRow args) { table -> { FileStoreTable fileStoreTable = (FileStoreTable) table; ConsumerManager consumerManager = - new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location()); + new ConsumerManager( + fileStoreTable.fileIO(), + fileStoreTable.location(), + fileStoreTable.snapshotManager().branch()); if (nextSnapshotId == null) { consumerManager.deleteConsumer(consumerId); } else {