From 9643a3ecd493116f2cee26a2f3b3708bd331ff36 Mon Sep 17 00:00:00 2001 From: lmangani Date: Sat, 21 Dec 2024 17:24:05 +0000 Subject: [PATCH] full blocks --- src/lib.rs | 104 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 32 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fd23d7d..6441e72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,6 @@ use duckdb_loadable_macros::duckdb_entrypoint_c_api; use libduckdb_sys as ffi; use byteorder::{ReadBytesExt, LittleEndian}; -// All original type definitions remain exactly the same #[allow(dead_code)] #[derive(Debug)] enum ColumnType { @@ -31,6 +30,7 @@ struct Column { #[repr(C)] struct ClickHouseBindData { filepath: *mut c_char, + _filepath_holder: Option, } #[repr(C)] @@ -43,7 +43,8 @@ struct ClickHouseInitData { impl Free for ClickHouseBindData { fn free(&mut self) { - unsafe { if !self.filepath.is_null() { drop(CString::from_raw(self.filepath)); } } + self._filepath_holder.take(); + self.filepath = std::ptr::null_mut(); } } @@ -53,7 +54,6 @@ impl Free for ClickHouseInitData { } } -// All original functions remain exactly the same fn read_string(reader: &mut impl Read) -> io::Result { let len = reader.read_u8()?; let mut buffer = vec![0; len as usize]; @@ -90,11 +90,17 @@ fn parse_column_type(type_str: &str) -> (ColumnType, Option) { fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64) -> io::Result> { let mut data = Vec::with_capacity(rows as usize); - for _ in 0..rows { + for i in 0..rows { let value = match column_type { + ColumnType::UInt64 => { + let val = reader.read_u64::()?; + // if i < 5 || i == rows - 1 { + // println!("Row {}: {}", i, val); + // } + ColumnData::UInt64(val) + }, ColumnType::String => ColumnData::String(read_string(reader)?), ColumnType::UInt8 | ColumnType::Enum8 => ColumnData::UInt8(reader.read_u8()?), - ColumnType::UInt64 => ColumnData::UInt64(reader.read_u64::()?), ColumnType::Int => ColumnData::Int(reader.read_i32::()?), ColumnType::Unsupported(_) => ColumnData::Null, }; @@ -105,43 +111,77 @@ fn read_column_data(reader: &mut impl Read, column_type: &ColumnType, rows: u64) fn read_var_u64(reader: &mut impl Read) -> io::Result { let mut x = 0u64; - for i in 0..10 { + let mut shift = 0; + + for _ in 0..10 { let byte = reader.read_u8()?; - x |= ((byte & 0x7F) as u64) << (7 * i); + x |= ((byte & 0x7F) as u64) << shift; + shift += 7; if byte & 0x80 == 0 { return Ok(x); } } + Err(io::Error::new(io::ErrorKind::InvalidData, "Invalid VarUInt")) } +fn skip_block_header(reader: &mut BufReader) -> io::Result<()> { + // Skip the marker + let mut marker = [0u8; 4]; + reader.read_exact(&mut marker)?; + + // Skip two strings (each prefixed with length) + for _ in 0..2 { + let str_len = reader.read_u8()? as u64; + reader.seek_relative(str_len as i64)?; + } + + Ok(()) +} + fn read_native_format(reader: &mut BufReader) -> io::Result> { + // Read number of columns let num_columns = read_var_u64(reader)?; - let mut columns: Vec = Vec::new(); - let mut is_first_block = true; + let mut columns = Vec::new(); + + // Read block size + let num_rows = read_var_u64(reader)?; + // Read column definitions and first block data + for _ in 0..num_columns { + let name = read_string(reader)?; + let type_str = read_string(reader)?; + let (column_type, type_params) = parse_column_type(&type_str); + let data = read_column_data(reader, &column_type, num_rows)?; + columns.push(Column { name, type_: column_type, type_params, data }); + } + + // Read subsequent blocks loop { - let num_rows = read_var_u64(reader)?; - - if is_first_block { - for _ in 0..num_columns { - let name = read_string(reader)?; - let type_str = read_string(reader)?; - let (column_type, type_params) = parse_column_type(&type_str); - let data = read_column_data(reader, &column_type, num_rows)?; - columns.push(Column { name, type_: column_type, type_params, data }); - } - is_first_block = false; - } else { - for col in &mut columns { - let data = read_column_data(reader, &col.type_, num_rows)?; - col.data.extend(data); - } - } + // Try to read number of columns for next block + let pos = reader.stream_position()?; + let block_columns = match read_var_u64(reader) { + Ok(cols) => cols, + Err(_) => break, // End of file + }; + + let block_rows = read_var_u64(reader)?; - if num_rows < 65409 { + if block_rows == 0 { break; } + + // Skip column definitions for the block (they should match) + for _ in 0..block_columns { + let _ = read_string(reader)?; // column name + let _ = read_string(reader)?; // column type + } + + // Read block data + for col in &mut columns { + let mut new_data = read_column_data(reader, &col.type_, block_rows)?; + col.data.append(&mut new_data); + } } Ok(columns) @@ -172,10 +212,12 @@ impl VTab for ClickHouseVTab { bind.add_result_column(&column.name, LogicalTypeHandle::from(logical_type)); } - // Convert and store filepath - let c_filepath = CString::new(filepath)?; + let filepath_cstring = CString::new(filepath)?; + let raw_ptr = filepath_cstring.as_ptr() as *mut c_char; + unsafe { - (*data).filepath = c_filepath.into_raw(); + (*data).filepath = raw_ptr; + (*data)._filepath_holder = Some(filepath_cstring); } Ok(()) @@ -188,7 +230,6 @@ impl VTab for ClickHouseVTab { let file = File::open(filepath)?; let mut reader = BufReader::with_capacity(64 * 1024, file); - // Move columns directly into the data structure without intermediate assignment let read_result = read_native_format(&mut reader)?; let total_rows = if read_result.is_empty() { 0 } else { read_result[0].data.len() }; @@ -202,7 +243,6 @@ impl VTab for ClickHouseVTab { Ok(()) } - unsafe fn func(func: &FunctionInfo, output: &mut DataChunkHandle) -> Result<(), Box> { let init_data = func.get_init_data::();