Skip to content

Commit

Permalink
[core] Support set snapshot-id when query by consumer-id
Browse files Browse the repository at this point in the history
  • Loading branch information
xuyu committed Oct 11, 2024
1 parent 2542fc8 commit 8e9e78e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@
<td><p>Enum</p></td>
<td>Specify the consumer consistency mode for table.<br /><br />Possible values:<ul><li>"exactly-once": Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed.</li><li>"at-least-once": Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.</li></ul></td>
</tr>
<tr>
<td><h5>consumer.snapshot-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Specify the consumer startup mode from snapshot-id, if set it would reset snapshot-id firstly and consumer from the snapshot, default from the snapshot-id before.</td>
</tr>
<tr>
<td><h5>continuous.discovery-interval</h5></td>
<td style="word-wrap: break-word;">10 s</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,13 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to ignore consumer progress for the newly started job.");

public static final ConfigOption<Long> CONSUMER_SNAPSHOT_ID =
key("consumer.snapshot-id")
.longType()
.noDefaultValue()
.withDescription(
"Specify the consumer startup mode from snapshot-id, if set it would reset snapshot-id firstly and consumer from the snapshot, default from the snapshot-id before.");

public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
Expand Down Expand Up @@ -2075,6 +2082,10 @@ public boolean consumerIgnoreProgress() {
return options.get(CONSUMER_IGNORE_PROGRESS);
}

public Long consumerStartupFromSnapshotID() {
return options.get(CONSUMER_SNAPSHOT_ID);
}

public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -62,6 +65,8 @@ public abstract class AbstractDataTableScan implements DataTableScan {
private final CoreOptions options;
protected final SnapshotReader snapshotReader;

private static final Logger LOG = LoggerFactory.getLogger(AbstractDataTableScan.class);

protected AbstractDataTableScan(CoreOptions options, SnapshotReader snapshotReader) {
this.options = options;
this.snapshotReader = snapshotReader;
Expand Down Expand Up @@ -123,6 +128,17 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
String consumerId = options.consumerId();
if (isStreaming && consumerId != null && !options.consumerIgnoreProgress()) {
ConsumerManager consumerManager = snapshotReader.consumerManager();
Long snapshotId = options.consumerStartupFromSnapshotID();
if (snapshotId != null) {
if (snapshotManager.snapshotExists(snapshotId)) {
consumerManager.resetConsumer(consumerId, new Consumer(snapshotId));
} else {
LOG.warn(
String.format(
"You set consumer.snapshot-id: %s is not exisit, it would not work!",
snapshotId));
}
}
Optional<Consumer> consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
return new ContinuousFromSnapshotStartingScanner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ public void testResetConsumer(String invoker) throws Exception {
changelogRow("+I", 3L, "Paimon")))
.close();

// use consumer streaming read table with consumer.snapshot-id
testStreamingRead(
"SELECT * FROM `"
+ tableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h','consumer.snapshot-id'='2') */",
Arrays.asList(
changelogRow("+I", 2L, "Hello"), changelogRow("+I", 3L, "Paimon")))
.close();

testStreamingRead(
"SELECT * FROM `"
+ tableName
+ "` /*+ OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h','consumer.snapshot-id'='3') */",
Arrays.asList(changelogRow("+I", 3L, "Paimon")))
.close();

Thread.sleep(1000);
ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location());
Optional<Consumer> consumer1 = consumerManager.consumer("myid");
Expand Down

0 comments on commit 8e9e78e

Please sign in to comment.