Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add TimeTravelUtils to resolve snapshot from scan params #4307

Merged
merged 16 commits into from
Oct 14, 2024

Conversation

xuzifu666
Copy link
Member

@xuzifu666 xuzifu666 commented Oct 11, 2024

Purpose

Add TimeTravelUtils to resolve snapshot from scan params

Linked issue: close #xxx

Tests

API and Format

Documentation

@LinMingQiang
Copy link
Contributor

LinMingQiang commented Oct 11, 2024

Expansion :We can support more scan startup mode, like timestamp.

Add parameters : consumer.scan.startup.mode = snapshot/timestamp

@xuzifu666
Copy link
Member Author

xuzifu666 commented Oct 11, 2024

Expansion :We can support more scan startup mode, like timestamp.

Add parameters : consumer.scan.startup.mode = snapshot/timestamp

Thanks @LinMingQiang ,good reminder, had added timestamp config, but mode not add due to user not need add two config if they use snapshot or timestamp.

@xuzifu666 xuzifu666 changed the title [core] Support set snapshot-id when query by consumer-id [core] Support specify snapshot-id and timestamp when query by consumer-id Oct 11, 2024
@JingsongLi
Copy link
Contributor

JingsongLi commented Oct 11, 2024

Maybe you want /*+ OPTIONS('consumer-id'='myid', 'scan.snapshot-id'='3', 'consumer.ignore-progress' = 'true') */?

@xuzifu666
Copy link
Member Author

Maybe you want /*+ OPTIONS('consumer-id'='myid', 'scan.snapshot-id'='3', 'consumer.ignore-progress' = 'true') */?

Yes, it is a direct way, I had test for it, can query right snapshot and reset consumer. Thanks @JingsongLi

@xuzifu666 xuzifu666 changed the title [core] Support specify snapshot-id and timestamp when query by consumer-id [core] Support TimeTravelUtils to resolve snapshot from scan params Oct 11, 2024
import static org.junit.jupiter.api.Assertions.assertThrows;

/** Tests for {@link TimeTravelUtilsTest}. */
public class TimeTravelUtilsTest extends ScannerTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@link TimeTravelUtil

@xuzifu666 xuzifu666 changed the title [core] Support TimeTravelUtils to resolve snapshot from scan params [core] Add TimeTravelUtils to resolve snapshot from scan params Oct 11, 2024
CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_WATERMARK.key(),
CoreOptions.SCAN_TIMESTAMP_MILLIS.key()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use checkArgument will be better.

scanKeys.add(CoreOptions.SCAN_TAG_NAME.key());
scanKeys.add(CoreOptions.SCAN_WATERMARK.key());
scanKeys.add(CoreOptions.SCAN_TIMESTAMP_MILLIS.key());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use String[]{} is better.

if (scanHandleKey.size() > 1) {
throw new IllegalArgumentException(
String.format(
"%s %s %s and %s can contains only one",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only one of the following parameters may be set : [%s, %s, %s]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @LinMingQiang had addressed

@xuzifu666 xuzifu666 closed this Oct 11, 2024
@xuzifu666 xuzifu666 reopened this Oct 11, 2024

public static Snapshot resolveSnapshotFromOption(
CoreOptions options, SnapshotManager snapshotManager) {
List<String> scanHandleKey = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List scanHandleKey = new ArrayList<>(1);

/** Contains time travel functions. */
public class TimeTravelUtil {

public static String[] scanKeys = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static final String[] SCAN_KEYS = {

CoreOptions.SCAN_TIMESTAMP_MILLIS.key()
};

public static Snapshot resolveSnapshotFromOption(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolveSnapshotFromOptions

}
}

if (scanHandleKey.size() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a warn log?

return snapshot;
}

private static Snapshot handleSnapshotId(SnapshotManager snapshotManager, CoreOptions options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolveSnapshotBySnapshotId

optMap.put("scan.timestamp-millis", ts + "");
options = CoreOptions.fromMap(optMap);
snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager);
assertThat(snapshot.id() == 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertTrue

optMap.put("scan.snapshot-id", "2");
CoreOptions options = CoreOptions.fromMap(optMap);
Snapshot snapshot = TimeTravelUtil.resolveSnapshotFromOption(options, snapshotManager);
assertThat(snapshot.id() == 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add assert snapshot.id() is not null

public class TimeTravelUtilsTest extends ScannerTestBase {

@Test
public void testScan() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testResolveSnapshotFromOptions


write.write(rowData(1, 10, 100L));
commit.commit(0, write.prepareCommit(true, 0));
long ts = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline this

write.write(rowData(3, 50, 500L));
commit.commit(2, write.prepareCommit(true, 2));

HashMap<String, String> optMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<String, String> optMap = new HashMap<>(4);

@xuzifu666
Copy link
Member Author

@wwj6591812 had addressd, Thanks.

@wwj6591812
Copy link
Contributor

+1

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit aed75ed into apache:master Oct 14, 2024
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants