Skip to content

Commit

Permalink
full blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani committed Dec 21, 2024
1 parent 78c8227 commit 9643a3e
Showing 1 changed file with 72 additions and 32 deletions.
104 changes: 72 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use duckdb_loadable_macros::duckdb_entrypoint_c_api;
use libduckdb_sys as ffi;
use byteorder::{ReadBytesExt, LittleEndian};

// All original type definitions remain exactly the same
#[allow(dead_code)]
#[derive(Debug)]
enum ColumnType {
Expand All @@ -31,6 +30,7 @@ struct Column {
#[repr(C)]
struct ClickHouseBindData {
filepath: *mut c_char,
_filepath_holder: Option<CString>,
}

#[repr(C)]
Expand All @@ -43,7 +43,8 @@ struct ClickHouseInitData {

impl Free for ClickHouseBindData {
fn free(&mut self) {
unsafe { if !self.filepath.is_null() { drop(CString::from_raw(self.filepath)); } }
self._filepath_holder.take();
self.filepath = std::ptr::null_mut();
}
}

Expand All @@ -53,7 +54,6 @@ impl Free for ClickHouseInitData {
}
}

// All original functions remain exactly the same
fn read_string(reader: &mut impl Read) -> io::Result<String> {
let len = reader.read_u8()?;
let mut buffer = vec![0; len as usize];
Expand Down Expand Up @@ -90,11 +90,17 @@ fn parse_column_type(type_str: &str) -> (ColumnType, Option<String>) {

fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64) -> io::Result<Vec<ColumnData>> {
let mut data = Vec::with_capacity(rows as usize);
for _ in 0..rows {
for i in 0..rows {
let value = match column_type {
ColumnType::UInt64 => {
let val = reader.read_u64::<LittleEndian>()?;
// if i < 5 || i == rows - 1 {
// println!("Row {}: {}", i, val);
// }
ColumnData::UInt64(val)
},
ColumnType::String => ColumnData::String(read_string(reader)?),
ColumnType::UInt8 | ColumnType::Enum8 => ColumnData::UInt8(reader.read_u8()?),
ColumnType::UInt64 => ColumnData::UInt64(reader.read_u64::<LittleEndian>()?),
ColumnType::Int => ColumnData::Int(reader.read_i32::<LittleEndian>()?),
ColumnType::Unsupported(_) => ColumnData::Null,
};
Expand All @@ -105,43 +111,77 @@ fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64)

fn read_var_u64(reader: &mut impl Read) -> io::Result<u64> {
let mut x = 0u64;
for i in 0..10 {
let mut shift = 0;

for _ in 0..10 {
let byte = reader.read_u8()?;
x |= ((byte & 0x7F) as u64) << (7 * i);
x |= ((byte & 0x7F) as u64) << shift;
shift += 7;
if byte & 0x80 == 0 {
return Ok(x);
}
}

Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid VarUInt"))
}

fn skip_block_header(reader: &mut BufReader<File>) -> io::Result<()> {
// Skip the marker
let mut marker = [0u8; 4];
reader.read_exact(&mut marker)?;

// Skip two strings (each prefixed with length)
for _ in 0..2 {
let str_len = reader.read_u8()? as u64;
reader.seek_relative(str_len as i64)?;
}

Ok(())
}

fn read_native_format(reader: &mut BufReader<File>) -> io::Result<Vec<Column>> {
// Read number of columns
let num_columns = read_var_u64(reader)?;
let mut columns: Vec<Column> = Vec::new();
let mut is_first_block = true;
let mut columns = Vec::new();

// Read block size
let num_rows = read_var_u64(reader)?;

// Read column definitions and first block data
for _ in 0..num_columns {
let name = read_string(reader)?;
let type_str = read_string(reader)?;
let (column_type, type_params) = parse_column_type(&type_str);
let data = read_column_data(reader, &column_type, num_rows)?;
columns.push(Column { name, type_: column_type, type_params, data });
}

// Read subsequent blocks
loop {
let num_rows = read_var_u64(reader)?;

if is_first_block {
for _ in 0..num_columns {
let name = read_string(reader)?;
let type_str = read_string(reader)?;
let (column_type, type_params) = parse_column_type(&type_str);
let data = read_column_data(reader, &column_type, num_rows)?;
columns.push(Column { name, type_: column_type, type_params, data });
}
is_first_block = false;
} else {
for col in &mut columns {
let data = read_column_data(reader, &col.type_, num_rows)?;
col.data.extend(data);
}
}
// Try to read number of columns for next block
let pos = reader.stream_position()?;
let block_columns = match read_var_u64(reader) {
Ok(cols) => cols,
Err(_) => break, // End of file
};

let block_rows = read_var_u64(reader)?;

if num_rows < 65409 {
if block_rows == 0 {
break;
}

// Skip column definitions for the block (they should match)
for _ in 0..block_columns {
let _ = read_string(reader)?; // column name
let _ = read_string(reader)?; // column type
}

// Read block data
for col in &mut columns {
let mut new_data = read_column_data(reader, &col.type_, block_rows)?;
col.data.append(&mut new_data);
}
}

Ok(columns)
Expand Down Expand Up @@ -172,10 +212,12 @@ impl VTab for ClickHouseVTab {
bind.add_result_column(&column.name, LogicalTypeHandle::from(logical_type));
}

// Convert and store filepath
let c_filepath = CString::new(filepath)?;
let filepath_cstring = CString::new(filepath)?;
let raw_ptr = filepath_cstring.as_ptr() as *mut c_char;

unsafe {
(*data).filepath = c_filepath.into_raw();
(*data).filepath = raw_ptr;
(*data)._filepath_holder = Some(filepath_cstring);
}

Ok(())
Expand All @@ -188,7 +230,6 @@ impl VTab for ClickHouseVTab {
let file = File::open(filepath)?;
let mut reader = BufReader::with_capacity(64 * 1024, file);

// Move columns directly into the data structure without intermediate assignment
let read_result = read_native_format(&mut reader)?;
let total_rows = if read_result.is_empty() { 0 } else { read_result[0].data.len() };

Expand All @@ -202,7 +243,6 @@ impl VTab for ClickHouseVTab {
Ok(())
}


unsafe fn func(func: &FunctionInfo, output: &mut DataChunkHandle) -> Result<(), Box<dyn Error>> {
let init_data = func.get_init_data::<ClickHouseInitData>();

Expand Down

0 comments on commit 9643a3e

Please sign in to comment.