Skip to content

Commit

Permalink
refactor: enforce using same column data key
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Nov 13, 2024
1 parent c368bad commit 634792f
Showing 1 changed file with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ impl BlockReader {
let column_array_cache = CacheManager::instance().get_table_data_array_cache();
let mut cached_column_data = vec![];
let mut cached_column_array = vec![];

let column_cache_key_builder = ColumnCacheKeyBuilder::new(location);

for (_index, (column_id, ..)) in self.project_indices.iter() {
if let Some(ignore_column_ids) = ignore_column_ids {
if ignore_column_ids.contains(column_id) {
Expand All @@ -58,7 +61,7 @@ impl BlockReader {
if let Some(column_meta) = columns_meta.get(column_id) {
let (offset, len) = column_meta.offset_length();

let column_cache_key = TableDataCacheKey::new(location, *column_id, offset, len);
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);

// first, check in memory table data cache
// column_array_cache
Expand Down Expand Up @@ -91,13 +94,8 @@ impl BlockReader {
.await?;

if self.put_cache {
let table_data_cache = CacheManager::instance().get_table_data_cache();
// add raw data (compressed raw bytes) to column cache
for (column_id, (chunk_idx, range)) in &merge_io_result.columns_chunk_offsets {
// Safe to unwrap here, since this column has been fetched, its meta must be present.
let column_meta = columns_meta.get(column_id).unwrap();
let (offset, len) = column_meta.offset_length();

// Should NOT use `range.start` as part of the cache key,
// as they are not stable and can vary for the same column depending on the query's projection.
// For instance:
Expand All @@ -106,12 +104,15 @@ impl BlockReader {
// may result in different ranges for `col2`
// This can lead to cache missing or INCONSISTENCIES

let cache_key = TableDataCacheKey::new(location, *column_id, offset, len);
// Safe to unwrap here, since this column has been fetched, its meta must be present.
let column_meta = columns_meta.get(column_id).unwrap();
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);

let chunk_data = merge_io_result
.owner_memory
.get_chunk(*chunk_idx, &merge_io_result.block_path)?;
let data = chunk_data.slice(range.clone());
table_data_cache.insert(cache_key.as_ref().to_owned(), data);
column_data_cache.insert(column_cache_key.as_ref().to_owned(), data);
}
}

Expand All @@ -123,3 +124,17 @@ impl BlockReader {
Ok(block_read_res)
}
}

struct ColumnCacheKeyBuilder<'a> {
block_path: &'a str,
}

impl<'a> ColumnCacheKeyBuilder<'a> {
fn new(block_path: &'a str) -> Self {
Self { block_path }
}
fn cache_cache(&self, column_id: &ColumnId, column_meta: &ColumnMeta) -> TableDataCacheKey {
let (offset, len) = column_meta.offset_length();
TableDataCacheKey::new(self.block_path, *column_id, offset, len)
}
}

0 comments on commit 634792f

Please sign in to comment.