Skip to content

Commit

Permalink
updating writeFile to directly use AsynchronousByteChannelWriteSubscr…
Browse files Browse the repository at this point in the history
…iber with position and emitter (Azure#43232)
  • Loading branch information
ibrahimrabab authored Dec 5, 2024
1 parent b735a8d commit b073412
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,8 @@ public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileCha
} else if (position < 0) {
return monoError(LOGGER, new IllegalArgumentException("'position' cannot be less than 0."));
}

return writeToAsynchronousByteChannel(content, IOUtils.toAsynchronousByteChannel(outFile, position));
return Mono.create(emitter -> content.subscribe(
new AsynchronousByteChannelWriteSubscriber(IOUtils.toAsynchronousByteChannel(outFile, position), emitter)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,52 @@ public void testWriteFileWithPosition() throws Exception {
}
}

@Test
public void testWriteFileWithReSubscriptionAndRetry() throws Exception {
String initialContent = "hello there";
String expectedContent = "hello again";

byte[] firstBytes = "Hello".getBytes(StandardCharsets.UTF_8);
byte[] secondBytes = "test".getBytes(StandardCharsets.UTF_8);
byte[] thirdBytes = "again".getBytes(StandardCharsets.UTF_8);

AtomicInteger attempt = new AtomicInteger(0);

Flux<ByteBuffer> body = Flux.defer(() -> {
int currentAttempt = attempt.incrementAndGet();
if (currentAttempt == 1) {
return Flux.just(ByteBuffer.wrap(firstBytes)).concatWith(Flux.error(new IOException()));
} else if (currentAttempt == 2) {
return Flux.just(ByteBuffer.wrap(secondBytes)).concatWith(Flux.error(new IOException()));
} else {
return Flux.just(ByteBuffer.wrap(thirdBytes));
}
});

File file = createFileIfNotExist();

try (FileOutputStream stream = new FileOutputStream(file)) {
stream.write(initialContent.getBytes(StandardCharsets.UTF_8));
}

StepVerifier
.create(Mono.fromCallable(() -> AsynchronousFileChannel.open(file.toPath(), StandardOpenOption.WRITE))
.flatMap(channel -> FluxUtil.writeFile(body, channel, 6)
.retry(2) // Retry twice, third should succeed
.then(Mono.fromCallable(() -> Files.readAllBytes(file.toPath())).doFinally(signalType -> {
try {
channel.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}))))
.expectNextMatches(outputStream -> {
assertArraysEqual(expectedContent.getBytes(StandardCharsets.UTF_8), outputStream);
return true;
})
.verifyComplete();
}

@Test
public void testWriteWritableChannel() {
String content = "test";
Expand Down

0 comments on commit b073412

Please sign in to comment.