Skip to content

Commit

Permalink
[core] Introduce consumer.ignore-progress (apache#2585)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 27, 2023
1 parent 793211e commit 0b2cc15
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 11 deletions.
3 changes: 2 additions & 1 deletion docs/content/how-to/querying-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
When stream read Paimon tables, the next snapshot id to be recorded into the file system. This has several advantages:

1. When previous job is stopped, the newly started job can continue to consume from the previous progress without
resuming from the state. The newly reading will start reading from next snapshot id found in consumer files.
resuming from the state. The newly reading will start reading from next snapshot id found in consumer files.
If you don't want this behavior, you can set `'consumer.ignore-progress'` to true.
2. When deciding whether a snapshot has expired, Paimon looks at all the consumers of the table in the file system,
and if there are consumers that still depend on this snapshot, then this snapshot will not be deleted by expiration.
3. When there is no watermark definition, the Paimon table will pass the watermark in the snapshot to the downstream
Expand Down
14 changes: 10 additions & 4 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<td>Duration</td>
<td>The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value.</td>
</tr>
<tr>
<td><h5>consumer.ignore-progress</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to ignore consumer progress for the newly started job.</td>
</tr>
<tr>
<td><h5>consumer.mode</h5></td>
<td style="word-wrap: break-word;">exactly-once</td>
Expand Down Expand Up @@ -141,16 +147,16 @@
<td>Whether to ignore delete records in deduplicate mode.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.initial-buckets</h5></td>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
<td>Parallelism of assigner operator for dynamic bucket mode, it is related to the number of initialized bucket, too small will lead to insufficient processing speed of assigner.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.assigner-parallelism</h5></td>
<td><h5>dynamic-bucket.initial-buckets</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Parallelism of assigner operator for dynamic bucket mode, it is related to the number of initialized bucket, too small will lead to insufficient processing speed of assigner.</td>
<td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.target-row-num</h5></td>
Expand Down
11 changes: 9 additions & 2 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,13 @@ public class CoreOptions implements Serializable {
.defaultValue(ConsumerMode.EXACTLY_ONCE)
.withDescription("Specify the consumer consistency mode for table.");

public static final ConfigOption<Boolean> CONSUMER_IGNORE_PROGRESS =
key("consumer.ignore-progress")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore consumer progress for the newly started job.");

public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
Expand Down Expand Up @@ -1420,8 +1427,8 @@ public Duration consumerExpireTime() {
return options.get(CONSUMER_EXPIRATION_TIME);
}

public ConsumerMode consumerWithLegacyMode() {
return options.get(CONSUMER_CONSISTENCY_MODE);
public boolean consumerIgnoreProgress() {
return options.get(CONSUMER_IGNORE_PROGRESS);
}

public boolean partitionedTableInMetastore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ public class Consumer {

private static final String FIELD_NEXT_SNAPSHOT = "nextSnapshot";

private static final int READ_CONSUMER_RETRY_NUM = 3;
private static final int READ_CONSUMER_RETRY_INTERVAL = 100;

private final long nextSnapshot;

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {

// read from consumer id
String consumerId = options.consumerId();
if (consumerId != null) {
if (consumerId != null && !options.consumerIgnoreProgress()) {
ConsumerManager consumerManager = snapshotReader.consumerManager();
Optional<Consumer> consumer = consumerManager.consumer(consumerId);
if (consumer.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.COMPACTION_MAX_FILE_NUM;
import static org.apache.paimon.CoreOptions.CONSUMER_IGNORE_PROGRESS;
import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_EXECUTION_MODE;
Expand Down Expand Up @@ -646,6 +647,18 @@ public void testConsumeId() throws Exception {
assertThat(result)
.containsExactlyInAnyOrder("+1|30|300|binary|varbinary|mapKey:mapVal|multiset");

// read again using consume id with ignore progress
scan =
table.copy(Collections.singletonMap(CONSUMER_IGNORE_PROGRESS.key(), "true"))
.newStreamScan();
List<Split> splits = scan.plan().splits();
result = getResult(read, splits, STREAMING_ROW_TO_STRING);
assertThat(result)
.containsExactlyInAnyOrder(
"+1|10|100|binary|varbinary|mapKey:mapVal|multiset",
"+1|20|200|binary|varbinary|mapKey:mapVal|multiset",
"+1|30|300|binary|varbinary|mapKey:mapVal|multiset");

// test snapshot expiration
for (int i = 3; i <= 8; i++) {
write.write(rowData(1, (i + 1) * 10, (i + 1) * 100L));
Expand All @@ -665,6 +678,7 @@ public void testConsumeId() throws Exception {
table.copy(
Collections.singletonMap(
CoreOptions.CONSUMER_EXPIRATION_TIME.key(), "1 s"));

// commit to trigger expiration
writeBuilder = table.newStreamWriteBuilder();
write = writeBuilder.newWrite();
Expand Down

0 comments on commit 0b2cc15

Please sign in to comment.