Skip to content

Commit

Permalink
[BUG] Streaming bulk request hangs (#16158)
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta authored Oct 1, 2024
1 parent 0b1650d commit 1ee858f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Avoid infinite loop when `flat_object` field contains invalid token ([#15985](https://github.com/opensearch-project/OpenSearch/pull/15985))
- Fix infinite loop in nested agg ([#15931](https://github.com/opensearch-project/OpenSearch/pull/15931))
- Fix race condition in node-join and node-left ([#15521](https://github.com/opensearch-project/OpenSearch/pull/15521))
- Streaming bulk request hangs ([#16158](https://github.com/opensearch-project/OpenSearch/pull/16158))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Locale;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand Down Expand Up @@ -297,4 +298,31 @@ public void testStreamingBadStream() throws IOException {
assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());
}

public void testStreamingLargeDocument() throws IOException {
final Stream<String> stream = Stream.of(
String.format(
Locale.getDefault(),
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n{ \"name\": \"%s\" }\n",
randomAlphaOfLength(5000)
)
);

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"type\":\"illegal_argument_exception\""))
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ public void sendResponse(RestResponse response) {
prepareResponse(response.status(), Map.of("Content-Type", List.of(response.contentType())));
}

Mono.ignoreElements(this).then(Mono.just(response)).subscribe(delegate::sendResponse);
Mono.from(this).ignoreElement().then(Mono.just(response)).subscribe(delegate::sendResponse);
}

@Override
Expand Down

0 comments on commit 1ee858f

Please sign in to comment.