Skip to content

Commit

Permalink
[core] Consumer-id support branch (#3790)
Browse files Browse the repository at this point in the history
  • Loading branch information
herefree authored Jul 23, 2024
1 parent 52f2570 commit 4184489
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/content/maintenance/manage-branches.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ You can read or write with branch as below.
```sql
-- read from branch 'branch1'
SELECT * FROM `t$branch_branch1`;
SELECT * FROM `t$branch_branch1` /*+ OPTIONS('consumer-id' = 'myid') */;

-- write to branch 'branch1'
INSERT INTO `t$branch_branch1` SELECT ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.StringUtils;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -33,6 +34,8 @@
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;

Expand All @@ -46,9 +49,16 @@ public class ConsumerManager implements Serializable {
private final FileIO fileIO;
private final Path tablePath;

private final String branch;

public ConsumerManager(FileIO fileIO, Path tablePath) {
this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
}

public ConsumerManager(FileIO fileIO, Path tablePath, String branchName) {
this.fileIO = fileIO;
this.tablePath = tablePath;
this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName;
}

public Optional<Consumer> consumer(String consumerId) {
Expand Down Expand Up @@ -119,10 +129,11 @@ public List<String> listAllIds() {
}

private Path consumerDirectory() {
return new Path(tablePath + "/consumer");
return new Path(branchPath(tablePath, branch) + "/consumer");
}

private Path consumerPath(String consumerId) {
return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX + consumerId);
return new Path(
branchPath(tablePath, branch) + "/consumer/" + CONSUMER_PREFIX + consumerId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ public TableCommitImpl newCommit(String commitUser) {
options.writeOnly() ? null : store().newTagCreationManager(),
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
coreOptions().snapshotExpireExecutionMode(),
name(),
coreOptions().forceCreatingSnapshot());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public ExpireChangelogImpl(
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
new ConsumerManager(
snapshotManager.fileIO(),
snapshotManager.tablePath(),
snapshotManager.branch());
this.changelogDeletion = changelogDeletion;
this.expireConfig = ExpireConfig.builder().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public ExpireSnapshotsImpl(
TagManager tagManager) {
this.snapshotManager = snapshotManager;
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
new ConsumerManager(
snapshotManager.fileIO(),
snapshotManager.tablePath(),
snapshotManager.branch());
this.snapshotDeletion = snapshotDeletion;
this.tagManager = tagManager;
this.expireConfig = ExpireConfig.builder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ public SnapshotReaderImpl(
this.deletionVectors = options.deletionVectorsEnabled();
this.snapshotManager = snapshotManager;
this.consumerManager =
new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath());
new ConsumerManager(
snapshotManager.fileIO(),
snapshotManager.tablePath(),
snapshotManager.branch());
this.splitGenerator = splitGenerator;
this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
this.defaultValueAssigner = defaultValueAssigner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public Path tablePath() {
return tablePath;
}

public String branch() {
return branch;
}

public Path changelogDirectory() {
return new Path(branchPath(tablePath, branch) + "/changelog");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,18 @@ public class ConsumerManagerTest {

private ConsumerManager manager;

private ConsumerManager consumerManagerBranch;

@BeforeEach
public void before() {
this.manager =
new ConsumerManager(
LocalFileIO.create(), new org.apache.paimon.fs.Path(tempDir.toUri()));
this.consumerManagerBranch =
new ConsumerManager(
LocalFileIO.create(),
new org.apache.paimon.fs.Path(tempDir.toUri()),
"branch1");
}

@Test
Expand All @@ -62,6 +69,21 @@ public void test() {
assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L);

assertThat(manager.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));

Optional<Consumer> consumerBranch = consumerManagerBranch.consumer("id1");
assertThat(consumerBranch).isEmpty();

assertThat(consumerManagerBranch.minNextSnapshot()).isEmpty();

consumerManagerBranch.resetConsumer("id1", new Consumer(5));
consumerBranch = consumerManagerBranch.consumer("id1");
assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(5L);

consumerManagerBranch.resetConsumer("id2", new Consumer(8));
consumerBranch = consumerManagerBranch.consumer("id2");
assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(8L);

assertThat(consumerManagerBranch.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
}

@Test
Expand All @@ -83,11 +105,41 @@ public void testExpire() throws Exception {
manager.resetConsumer("id2", new Consumer(3));
manager.expire(expireDateTime);
assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(3L);

consumerManagerBranch.resetConsumer("id3", new Consumer(1));
Thread.sleep(1000);
LocalDateTime expireDateTimeBranch =
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
Thread.sleep(1000);
consumerManagerBranch.resetConsumer("id4", new Consumer(2));

// check expire
consumerManagerBranch.expire(expireDateTimeBranch);
assertThat(consumerManagerBranch.consumer("id3")).isEmpty();
assertThat(consumerManagerBranch.consumer("id4"))
.map(Consumer::nextSnapshot)
.get()
.isEqualTo(2L);

// check last modification
expireDateTimeBranch = DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
Thread.sleep(1000);
consumerManagerBranch.resetConsumer("id4", new Consumer(3));
consumerManagerBranch.expire(expireDateTimeBranch);
assertThat(consumerManagerBranch.consumer("id4"))
.map(Consumer::nextSnapshot)
.get()
.isEqualTo(3L);
}

@Test
public void testReadConsumer() throws Exception {
manager.resetConsumer("id1", new Consumer(5));
assertThat(manager.consumer("id1"));

consumerManagerBranch.resetConsumer("id2", new Consumer(5));
assertThat(consumerManagerBranch.consumer("id2"));

assertThat(manager.consumer("id2")).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public ResetConsumerAction withNextSnapshotIds(Long nextSnapshotId) {
public void run() throws Exception {
FileStoreTable dataTable = (FileStoreTable) table;
ConsumerManager consumerManager =
new ConsumerManager(dataTable.fileIO(), dataTable.location());
new ConsumerManager(
dataTable.fileIO(),
dataTable.location(),
dataTable.snapshotManager().branch());
if (Objects.isNull(nextSnapshotId)) {
consumerManager.deleteConsumer(consumerId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ public String[] call(
FileStoreTable fileStoreTable =
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
ConsumerManager consumerManager =
new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location());
new ConsumerManager(
fileStoreTable.fileIO(),
fileStoreTable.location(),
fileStoreTable.snapshotManager().branch());
consumerManager.resetConsumer(consumerId, new Consumer(nextSnapshotId));

return new String[] {"Success"};
Expand All @@ -61,7 +64,10 @@ public String[] call(ProcedureContext procedureContext, String tableId, String c
FileStoreTable fileStoreTable =
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
ConsumerManager consumerManager =
new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location());
new ConsumerManager(
fileStoreTable.fileIO(),
fileStoreTable.location(),
fileStoreTable.snapshotManager().branch());
consumerManager.deleteConsumer(consumerId);

return new String[] {"Success"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,88 @@ public void testResetConsumer() throws Exception {
Optional<Consumer> consumer3 = consumerManager.consumer("myid");
assertThat(consumer3).isNotPresent();
}

@Test
public void testResetBranchConsumer() throws Exception {
init(warehouse);

RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
new String[] {"pk1", "col1"});
FileStoreTable table =
createFileStoreTable(
rowType,
Collections.emptyList(),
Collections.singletonList("pk1"),
Collections.emptyList(),
Collections.emptyMap());

StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();

// 3 snapshots
writeData(rowData(1L, BinaryString.fromString("Hi")));
writeData(rowData(2L, BinaryString.fromString("Hello")));
writeData(rowData(3L, BinaryString.fromString("Paimon")));

String branchName = "b1";
table.createBranch("b1", 3);
String branchTableName = tableName + "$branch_b1";

// use consumer streaming read table
testStreamingRead(
"SELECT * FROM `"
+ branchTableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
Arrays.asList(
changelogRow("+I", 1L, "Hi"),
changelogRow("+I", 2L, "Hello"),
changelogRow("+I", 3L, "Paimon")))
.close();

ConsumerManager consumerManager =
new ConsumerManager(table.fileIO(), table.location(), branchName);
Optional<Consumer> consumer1 = consumerManager.consumer("myid");
assertThat(consumer1).isPresent();
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);

List<String> args =
Arrays.asList(
"reset_consumer",
"--warehouse",
warehouse,
"--database",
database,
"--table",
branchTableName,
"--consumer_id",
"myid",
"--next_snapshot",
"1");
// reset consumer
if (ThreadLocalRandom.current().nextBoolean()) {
createAction(ResetConsumerAction.class, args).run();
} else {
callProcedure(
String.format(
"CALL sys.reset_consumer('%s.%s', 'myid', 1)",
database, branchTableName));
}
Optional<Consumer> consumer2 = consumerManager.consumer("myid");
assertThat(consumer2).isPresent();
assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);

// delete consumer
if (ThreadLocalRandom.current().nextBoolean()) {
createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
} else {
callProcedure(
String.format(
"CALL sys.reset_consumer('%s.%s', 'myid')", database, branchTableName));
}
Optional<Consumer> consumer3 = consumerManager.consumer("myid");
assertThat(consumer3).isNotPresent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ public InternalRow[] call(InternalRow args) {
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
ConsumerManager consumerManager =
new ConsumerManager(fileStoreTable.fileIO(), fileStoreTable.location());
new ConsumerManager(
fileStoreTable.fileIO(),
fileStoreTable.location(),
fileStoreTable.snapshotManager().branch());
if (nextSnapshotId == null) {
consumerManager.deleteConsumer(consumerId);
} else {
Expand Down

0 comments on commit 4184489

Please sign in to comment.