Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

should not use seek() for skipping very small column chunks. better to read and ignore data. #3076

Open
Arnaud-Nauwynck opened this issue Nov 23, 2024 · 4 comments

Comments

@Arnaud-Nauwynck
Copy link
Contributor

Describe the enhancement requested

When reading some column chunks but not all, parquet is building a list of "ConsecutivePartList", then trying to call the Hadoop api for vectorized reader of FSDataInputStream#readVectored(List ...)

Unfortunatly, many implementations of "FSDataInputStream" do not override the readVectored() method, which trigger many distinct calls to read.

For example on hadoop-azure, the Azure Datalake Storage is much slower at establishing a new Https connection (using infamous calls HttpURLConnection for jdk 1.0, then doing TLS hand-shake), that to get only few more megas of data on an existing socket !!

The case with small wholes to avoid reading is very frequent when having columns in parquet files that are not read, and are highly compressed because of RLE encoding. Typically, a very sparse column with only few values, or even always null within a page. Such a column could be encoded in only few hundred of bytes by parquet, so it is NOT a problem of reading 100 bytes more.

Parquet should at least honor the following method from hadoop class FileSystem, that says that a seek of less than 4096 bytes is NOT reasonable.

  /**
   * What is the smallest reasonable seek?
   * @return the minimum number of bytes
   */
  default int minSeekForVectorReads() {
    return 4 * 1024;
  }

The logic for building this List for a list of column chunks is here:
org.apache.parquet.hadoop.ParquetFileReader#internalReadRowGroup

  private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException {
    ...
    for (ColumnChunkMetaData mc : block.getColumns()) {
        ...
        // first part or not consecutive => new list
        if (currentParts == null || currentParts.endPos() != startingPos) {  // <===== SHOULD honor minSeekForVectorReads()
          currentParts = new ConsecutivePartList(startingPos);
          allParts.add(currentParts);
        }
        currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
      }
    }
    // actually read all the chunks
    ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
    readAllPartsVectoredOrNormal(allParts, builder);
    rowGroup.setReleaser(builder.releaser);
    for (Chunk chunk : builder.build()) {
      readChunkPages(chunk, block, rowGroup);
    }

    return rowGroup;
}

maybe a possible implementation could be to add fictive "ConsecutivePartList" that are to be ignored while receiving the data, but that would avoid having some wholes in the ranges to read.

Component(s)

No response

@Arnaud-Nauwynck
Copy link
Contributor Author

see also related issue: #3077 : AzureBlobFileSystem.open() should return a sub-class fsDataInputStream that override readVectored() much more efficiently for small reads

@steveloughran
Copy link
Contributor

  • I think we should revisit that min seek range, at least add a good default for ABFS, and make it configurable. File a HADOOP- JIRA, ideally with a patch.
  • We do always have hopes for a full implementation for abfs; though it has yet to surface.
  • On hadoop 3.4.1 the abfs connector also support the httpclient connection pool for better performance, though that is stabilising.

Anyway, I agree with you about read and discard is better for cloud stores, what I don't know is what is a good value here

What do you think, at least in your deployments?

The velox paper actually set the value as 500K

IO reads for nearby columns are typically coalesced (merged) if the gap between them is small enough (currently about 20K for SSD and 500K for disaggregated storage)

Maybe in hadoop we should

  1. change the default size to 20k
  2. s3a and abfs set to 500K
  3. review the gcs settings too, though that's in google's code.

@mukund-thakur
Copy link
Contributor

Yes we wanted to do more performance TPCDS tests using different min seek values and then decide a good default but those are expensive and time consuming to run and that's why we haven't done it yet. It would be good to know if you have any performance benchmark numbers.

@steveloughran
Copy link
Contributor

I think we could do some micro benchmarking here as well.

What we wantis have a skip size such that it is faster to discard the data it is to wait for/acquire an http connection and download the data across multiple threads.

Acquisition time is trouble given the http connection pool size and wait times which may be imposed by the actions of other threads. Same for that thread pool scheduling overhead.

Download time is a function of bandwidth alone.

We could ignore the https and thread delays and focus on the time from GET to "first byte" -which would be entirely that imposed by the cloud store itself. Then all we care about is that the time to download skip the data is less than the GET-to-first-byte latency.

which is will be when

bytes-to-skip/bandwidth < t(GET-to-first-byte)

We could benchmark this: something to preheat the pool with a few head calls, then a set of single byte read() calls to different parts of a large file, with time to return being considered time to first byte.

then work out download bandwidth and so how many bytes match that time-to-first-byte

I will take a small PR to cloudstore for this; it'd be interesting to see what the local fs values are as well as the different cloud stores, local and remote. Even: do different VM types matter?

@Arnaud-Nauwynck
try setting http.maxConnections a number like 100, this will improve the cacheing of the http connections.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants