Skip to content

Commit

Permalink
HADOOP-19105. factor out start/stop.
Browse files Browse the repository at this point in the history
This preparing to pull this into a superclass so that all s3a
streams get it.

Change-Id: I2d67336ce5ef484ab0207e4b45aa224c869926fe
  • Loading branch information
steveloughran committed Jan 3, 2025
1 parent 13e168e commit 01b4219
Showing 1 changed file with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ public synchronized void close() throws IOException {
if (!closed) {
closed = true;
try {
stopVectoredIOOperations.set(true);
stopVectorOperations();
// close or abort the stream; blocking
closeStream("close() operation", false, true);
// end the client+audit span.
Expand Down Expand Up @@ -993,9 +993,7 @@ public void readVectored(final List<? extends FileRange> ranges,
final Consumer<ByteBuffer> release) throws IOException {
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
checkNotClosed();
if (stopVectoredIOOperations.getAndSet(false)) {
LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
}
maybeStartVectorOperations();
// fail fast on parameters which would otherwise only be checked
// in threads and/or in failures.
requireNonNull(allocate , "Null allocator");
Expand Down Expand Up @@ -1039,6 +1037,24 @@ public void readVectored(final List<? extends FileRange> ranges,
" on path {} for ranges {} ", pathStr, ranges);
}

/**
* Start/restart vector operations if not active.
* In particular, after an unbuffer(), this performs any
* initialization required.
*/
private void maybeStartVectorOperations() {
if (stopVectoredIOOperations.getAndSet(false)) {
LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
}
}

/**
* Stop vector operations.
*/
private void stopVectorOperations() {
stopVectoredIOOperations.set(true);
}

/**
* Read the data from S3 for the bigger combined file range and update all the
* underlying ranges.
Expand Down Expand Up @@ -1558,7 +1574,7 @@ public static long validateReadahead(@Nullable Long readahead) {
@Override
public synchronized void unbuffer() {
try {
stopVectoredIOOperations.set(true);
stopVectorOperations();
closeStream("unbuffer()", false, false);
} finally {
streamStatistics.unbuffered();
Expand Down

0 comments on commit 01b4219

Please sign in to comment.