Skip to content

Commit

Permalink
scan-query-stream: add stream scan query
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Jan 31, 2024
1 parent 3f96e88 commit 47750cf
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS,

private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, RESULT> statement, PARAMS params) {
List<RESULT> result = new ArrayList<>();
try (Stream<RESULT> stream = doExecuteScanQuery(statement, params)) {
try (Stream<RESULT> stream = executeScanQuery(statement, params)) {
stream.forEach(r -> {
if (result.size() >= options.getScanOptions().getMaxSize()) {
throw new ResultTruncatedException(
Expand All @@ -367,7 +367,8 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, R
return result;
}

private <PARAMS, RESULT> Stream<RESULT> doExecuteScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.timeout(options.getScanOptions().getTimeout())
.mode(com.yandex.ydb.table.YdbTable.ExecuteScanQueryRequest.Mode.MODE_EXEC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ public T postLoad(T e) {
public interface QueryExecutor {
<PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params);

<PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params);

<PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value);

default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
Expand Down Expand Up @@ -511,6 +513,11 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryLegacy(Statement<PARAMS,

private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, RESULT> statement, PARAMS params) {
List<RESULT> result = new ArrayList<>();
try (Stream<RESULT> stream = doExecuteScanQuery(statement, params)) {
try (Stream<RESULT> stream = executeScanQuery(statement, params)) {
stream.forEach(r -> {
if (result.size() >= options.getScanOptions().getMaxSize()) {
throw new ResultTruncatedException(
Expand All @@ -367,7 +367,8 @@ private <PARAMS, RESULT> List<RESULT> doExecuteScanQueryList(Statement<PARAMS, R
return result;
}

private <PARAMS, RESULT> Stream<RESULT> doExecuteScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(options.getScanOptions().getTimeout())
.setMode(ExecuteScanQuerySettings.Mode.EXEC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,8 @@ public T postLoad(T e) {
public interface QueryExecutor {
<PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params);

<PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params);

<PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value);

default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
Expand Down Expand Up @@ -511,6 +513,11 @@ public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
Expand Down

0 comments on commit 47750cf

Please sign in to comment.