From 6154d252f4cbf617733c569e71e73226b8598824 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 12:38:31 +0300 Subject: [PATCH 1/5] declare cargo dependencies --- .cargo/config.toml | 3 +++ Cargo.toml | 32 ++++++++++++++++++++------------ src/bin/pgrx_embed.rs | 2 ++ 3 files changed, 25 insertions(+), 12 deletions(-) create mode 100644 src/bin/pgrx_embed.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 13c456b..9ff17f6 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,6 @@ [target.'cfg(target_os="macos")'] # Postgres symbols won't be available until runtime rustflags = ["-Clink-arg=-Wl,-undefined,dynamic_lookup"] + +[net] +git-fetch-with-cli = true diff --git a/Cargo.toml b/Cargo.toml index 9ecb826..cacf6f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,35 +1,43 @@ [package] name = "pg_parquet" -version = "0.0.0" +version = "0.1.0" edition = "2021" [lib] -crate-type = ["cdylib"] +crate-type = ["cdylib","lib"] + +[[bin]] +name = "pgrx_embed_pg_parquet" +path = "./src/bin/pgrx_embed.rs" [features] default = ["pg16"] -# pg11 = ["pgrx/pg11", "pgrx-tests/pg11" ] -# pg12 = ["pgrx/pg12", "pgrx-tests/pg12" ] -# pg13 = ["pgrx/pg13", "pgrx-tests/pg13" ] -# pg14 = ["pgrx/pg14", "pgrx-tests/pg14" ] -# pg15 = ["pgrx/pg15", "pgrx-tests/pg15" ] pg16 = ["pgrx/pg16", "pgrx-tests/pg16" ] pg_test = [] [dependencies] -parquet = { version = "52.0.0", features = [ +arrow = {version = "53", default-features = false} +arrow-schema = {version = "53", default-features = false} +dotenvy = "0.15" +futures = "0.3" +object_store = {version = "0.11", default-features = false, features = ["aws"]} +once_cell = "1" +parquet = {version = "53", default-features = false, features = [ + "arrow", "snap", "brotli", "flate2", "lz4", "zstd", - "base64" + "object_store", ]} -parquet_derive = "52.0.0" -pgrx = "=0.11.4" +pgrx = "=0.12.4" +serde = {version = "1", default-features = false} +serde_json = "1" +tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]} [dev-dependencies] -pgrx-tests = "=0.11.4" +pgrx-tests = "=0.12.4" [profile.dev] panic = "unwind" diff --git a/src/bin/pgrx_embed.rs b/src/bin/pgrx_embed.rs new file mode 100644 index 0000000..bf56530 --- /dev/null +++ b/src/bin/pgrx_embed.rs @@ -0,0 +1,2 @@ +// magic is required due to PR https://github.com/pgcentralfoundation/pgrx/pull/1468 +::pgrx::pgrx_embed!(); From f20fcd5449faf851b134abd75e6ca36ed96b589a Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 12:38:44 +0300 Subject: [PATCH 2/5] add vscode settings file --- .vscode/settings.json | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 0f51e55..be4b716 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,7 @@ { "editor.formatOnSave": true, - "editor.defaultFormatter": "rust-lang.rust-analyzer" + "editor.defaultFormatter": "rust-lang.rust-analyzer", + "rust-analyzer.check.command": "clippy", + "rust-analyzer.checkOnSave": true, + "editor.inlayHints.enabled": "offUnlessPressed", } \ No newline at end of file From 7c4ac3aa2c8a7e6771c3d4b57366a9872b3c1e57 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 12:39:10 +0300 Subject: [PATCH 3/5] add ci workflow file --- .github/workflows/ci.yml | 70 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..9d3d14e --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,70 @@ +name: CI lints and tests +on: + push: + branches: + - "*" + +concurrency: + group: ${{ github.ref }} + cancel-in-progress: true + +jobs: + build-and-test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Rust + uses: dtolnay/rust-toolchain@stable + with: + toolchain: 1.81.0 + target: x86_64-unknown-linux-gnu + components: rustfmt, clippy + + - name: Install cargo-llvm-cov for coverage report + run: cargo install --locked cargo-llvm-cov@0.6.12 + + - name: Install PostgreSQL + run: | + sudo sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' + wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - + sudo apt-get update + sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config + sudo apt-get -y install postgresql-16-postgis-3 libpq-dev postgresql-server-dev-16 postgresql-client-16 + + - name: Install and configure pgrx + run: | + cargo install --locked cargo-pgrx@0.12.4 + cargo pgrx init --pg16 $(which pg_config) + + - name: Format and lint + run: | + cargo fmt --all -- --check + cargo clippy --all-targets --all-features -- -D warnings + + # pgrx tests with runas argument ignores environment variables, + # so we need to create a .env file beforehand + - name: Create .env file + run: | + touch /tmp/.env + echo AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >> /tmp/.env + echo AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >> /tmp/.env + echo AWS_REGION=${{ secrets.AWS_REGION }} >> /tmp/.env + echo AWS_S3_TEST_BUCKET=${{ secrets.AWS_S3_TEST_BUCKET }} >> /tmp/.env + + - name: Run tests + run: | + cargo llvm-cov test --lcov --output-path lcov.info + env: + RUST_TEST_THREADS: 1 + CARGO_PGRX_TEST_RUNAS: postgres + CARGO_PGRX_TEST_PGDATA: /tmp/pgdata + + - name: Upload coverage report to Codecov + uses: codecov/codecov-action@v4 + with: + fail_ci_if_error: true + files: ./lcov.info + flags: pgrxtests + token: ${{ secrets.CODECOV_TOKEN }} From d20e0219e05a2b7427a23db83fcf85a423b28bb7 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 12:40:28 +0300 Subject: [PATCH 4/5] update extension control file comment --- pg_parquet.control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pg_parquet.control b/pg_parquet.control index bfc6efc..9e4511e 100644 --- a/pg_parquet.control +++ b/pg_parquet.control @@ -1,4 +1,4 @@ -comment = 'pg_parquet: Created by pgrx' +comment = 'copy data between Postgres and Parquet' default_version = '@CARGO_VERSION@' module_pathname = '$libdir/pg_parquet' relocatable = false From 61f4ad4595bfbaad248b7a05ce1e4fb94678358c Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Mon, 23 Sep 2024 12:51:41 +0300 Subject: [PATCH 5/5] add pgrx utils module --- src/lib.rs | 1607 +-------------------------------------------- src/pgrx_utils.rs | 80 +++ 2 files changed, 87 insertions(+), 1600 deletions(-) create mode 100644 src/pgrx_utils.rs diff --git a/src/lib.rs b/src/lib.rs index e910f63..797dc88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,1609 +1,16 @@ -use parquet::data_type::ByteArrayType; -use parquet::data_type::Int32Type; -use parquet::file::reader::FileReader; -use parquet::file::reader::SerializedFileReader; -use parquet::file::writer::SerializedFileWriter; -use parquet::record::RowAccessor; -use parquet::record::{RecordReader, RecordWriter}; -use parquet_derive::{ParquetRecordReader, ParquetRecordWriter}; - -use pg_sys::{ - BOOLARRAYOID, BOOLOID, BPCHARARRAYOID, BPCHAROID, CHARARRAYOID, CHAROID, DATEARRAYOID, DATEOID, - FLOAT4ARRAYOID, FLOAT4OID, FLOAT8ARRAYOID, FLOAT8OID, INT2ARRAYOID, INT2OID, INT4ARRAYOID, - INT4OID, INT8ARRAYOID, INT8OID, INTERVALARRAYOID, INTERVALOID, NUMERICARRAYOID, NUMERICOID, - RECORDARRAYOID, RECORDOID, TEXTARRAYOID, TEXTOID, TIMEARRAYOID, TIMEOID, TIMESTAMPARRAYOID, - TIMESTAMPOID, TIMESTAMPTZARRAYOID, TIMESTAMPTZOID, TIMETZARRAYOID, TIMETZOID, VARCHARARRAYOID, - VARCHAROID, -}; use pgrx::prelude::*; -pgrx::pg_module_magic!(); - -// parquet serializable Person struct with id and name via parquet::record API -#[derive(Debug, Clone, ParquetRecordReader, ParquetRecordWriter)] -struct Person { - id: i32, - name: String, -} - -struct Person2 { - id: i32, - name: String, -} - -impl RecordReader for Vec { - fn read_from_row_group( - &mut self, - row_group_reader: &mut dyn parquet::file::reader::RowGroupReader, - num_records: usize, - ) -> Result<(), parquet::errors::ParquetError> { - assert!(num_records == 1); - - let row = row_group_reader - .get_row_iter(None) - .unwrap() - .next() - .unwrap() - .unwrap(); - - let id = row.get_int(0).unwrap(); - let name = row.get_bytes(1).unwrap().as_utf8().unwrap().to_string(); - self.push(Person2 { id, name }); - - Ok(()) - } -} - -impl RecordWriter for &[Person2] { - fn write_to_row_group( - &self, - row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter, - ) -> Result<(), parquet::errors::ParquetError> { - assert!(self.len() == 1); - - let person = &self[0]; - - let schema = Self::schema(&self).unwrap(); - let fields = schema.get_fields(); - - fields.iter().for_each(|field| { - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - let field_name = field.name(); - match field_name { - "id" => { - column_writer - .typed::() - .write_batch(&[person.id], None, None) - .unwrap(); - } - "name" => { - column_writer - .typed::() - .write_batch(&[person.name.as_bytes().into()], None, None) - .unwrap(); - } - _ => panic!("unexpected field name"), - } - - column_writer.close().unwrap(); - }); - - Ok(()) - } - - fn schema(&self) -> Result { - assert!(self.len() == 1); - - let id_type = - parquet::schema::types::Type::primitive_type_builder("id", parquet::basic::Type::INT32) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let name_type = parquet::schema::types::Type::primitive_type_builder( - "name", - parquet::basic::Type::BYTE_ARRAY, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let person_group_type = parquet::schema::types::Type::group_type_builder("person") - .with_fields(vec![id_type.into(), name_type.into()]) - .build() - .unwrap(); - - Ok(person_group_type.into()) - } -} - -//// nested struct -#[derive(Debug, Clone, Default)] -struct Cat { - id: i32, - name: String, -} - -impl RecordReader for Vec { - fn read_from_row_group( - &mut self, - row_group_reader: &mut dyn parquet::file::reader::RowGroupReader, - num_records: usize, - ) -> Result<(), parquet::errors::ParquetError> { - assert!(num_records == 1); - - let row = row_group_reader - .get_row_iter(None) - .unwrap() - .next() - .unwrap() - .unwrap(); - - let id = row.get_int(0).unwrap(); - let name = row.get_bytes(1).unwrap().as_utf8().unwrap().to_string(); - - self.push(Cat { id, name }); - - Ok(()) - } -} - -impl RecordWriter for &[Cat] { - fn write_to_row_group( - &self, - row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter, - ) -> Result<(), parquet::errors::ParquetError> { - assert!(self.len() == 1); - - let cat = &self[0]; - - let schema = Self::schema(&self).unwrap(); - let fields = schema.get_fields(); - - fields.iter().for_each(|field| { - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - let field_name = field.name(); - match field_name { - "id" => { - column_writer - .typed::() - .write_batch(&[cat.id], None, None) - .unwrap(); - } - "name" => { - column_writer - .typed::() - .write_batch(&[cat.name.as_bytes().into()], None, None) - .unwrap(); - } - _ => panic!("unexpected field name"), - } - - column_writer.close().unwrap(); - }); - - Ok(()) - } - - fn schema(&self) -> Result { - assert!(self.len() == 1); - - let id_type = - parquet::schema::types::Type::primitive_type_builder("id", parquet::basic::Type::INT32) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let name_type = parquet::schema::types::Type::primitive_type_builder( - "name", - parquet::basic::Type::BYTE_ARRAY, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let cat_group_type = parquet::schema::types::Type::group_type_builder("cat") - .with_fields(vec![id_type.into(), name_type.into()]) - .build() - .unwrap(); - - Ok(cat_group_type.into()) - } -} - -struct NestedPerson { - id: i32, - name: String, - cat: Option, -} - -impl RecordReader for Vec { - fn read_from_row_group( - &mut self, - row_group_reader: &mut dyn parquet::file::reader::RowGroupReader, - num_records: usize, - ) -> Result<(), parquet::errors::ParquetError> { - assert!(num_records == 1); - - let row = row_group_reader - .get_row_iter(None) - .unwrap() - .next() - .unwrap() - .unwrap(); - - let id = row.get_int(0).unwrap(); - let name = row.get_bytes(1).unwrap().as_utf8().unwrap().to_string(); - let cat = if let Ok(cat_row) = row.get_group(2) { - let cat_id = cat_row.get_int(0).unwrap(); - let cat_name = cat_row.get_bytes(1).unwrap().as_utf8().unwrap().to_string(); - let cat = Some(Cat { - id: cat_id, - name: cat_name, - }); - cat - } else { - None - }; - - self.push(NestedPerson { id, name, cat }); - - Ok(()) - } -} - -impl RecordWriter for &[NestedPerson] { - fn write_to_row_group( - &self, - row_group_writer: &mut parquet::file::writer::SerializedRowGroupWriter, - ) -> Result<(), parquet::errors::ParquetError> { - assert!(self.len() == 1); - - let person = &self[0]; - - let schema = Self::schema(&self).unwrap(); - let fields = schema.get_fields(); - - fields.iter().for_each(|field| { - let field_name = field.name(); - match field_name { - "id" => { - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[person.id], None, None) - .unwrap(); - - column_writer.close().unwrap(); - } - "name" => { - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[person.name.as_bytes().into()], None, None) - .unwrap(); - - column_writer.close().unwrap(); - } - "cat" => { - if let Some(cat) = &person.cat { - let cat_fields = field.get_fields(); - cat_fields.iter().for_each(|cat_field| { - let cat_field_name = cat_field.name(); - match cat_field_name { - "id" => { - let mut column_writer = - row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[cat.id], Some(&[1]), None) - .unwrap(); - - column_writer.close().unwrap(); - } - "name" => { - let mut column_writer = - row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch( - &[cat.name.as_bytes().into()], - Some(&[1]), - None, - ) - .unwrap(); - - column_writer.close().unwrap(); - } - _ => { - panic!("unexpected field name"); - } - } - }); - } - } - _ => { - panic!("unexpected field name"); - } - } - }); - - Ok(()) - } - - fn schema(&self) -> Result { - assert!(self.len() == 1); - - let id_type = - parquet::schema::types::Type::primitive_type_builder("id", parquet::basic::Type::INT32) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let name_type = parquet::schema::types::Type::primitive_type_builder( - "name", - parquet::basic::Type::BYTE_ARRAY, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let cat_group_type = parquet::schema::types::Type::group_type_builder("cat") - .with_fields(vec![ - parquet::schema::types::Type::primitive_type_builder( - "id", - parquet::basic::Type::INT32, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap() - .into(), - parquet::schema::types::Type::primitive_type_builder( - "name", - parquet::basic::Type::BYTE_ARRAY, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap() - .into(), - ]) - .with_repetition(parquet::basic::Repetition::OPTIONAL) - .build() - .unwrap(); - - let person_group_type = parquet::schema::types::Type::group_type_builder("person") - .with_fields(vec![ - id_type.into(), - name_type.into(), - cat_group_type.into(), - ]) - .build() - .unwrap(); - - Ok(person_group_type.into()) - } -} - -#[pg_schema] -mod pgparquet { - use parquet::data_type::{ - ByteArray, DoubleType, FixedLenByteArray, FixedLenByteArrayType, FloatType, Int64Type, - }; - use pg_sys::{JSONOID, UUIDARRAYOID, UUIDOID}; - use pgrx::{direct_function_call, Json, Uuid}; - - use super::*; - - fn serialize_record(tuple: PgHeapTuple<'static, AllocatedByRust>, tuple_name: Option<&str>) { - if let Some(tuple_name) = tuple_name { - pgrx::info!("\"{}\": ", tuple_name); - } - pgrx::info!("{{"); - - let attributes_len = tuple.len(); - for (idx, (_, attribute)) in tuple.attributes().enumerate() { - let attribute_name = attribute.name(); - let attribute_oid = attribute.type_oid().value(); - - let is_attribute_composite = unsafe { pg_sys::type_is_rowtype(attribute_oid) }; - - if is_attribute_composite { - let attribute_val = tuple - .get_by_name::>(attribute_name) - .unwrap() - .unwrap(); - serialize_record(attribute_val, Some(attribute_name)); - } else { - let attribute_val = tuple - .get_by_name::(attribute_name) - .unwrap() - .unwrap(); - serialize(attribute_val, Some(attribute_name)); - } - - if idx < attributes_len - 1 { - pgrx::info!(","); - } - } - } - - #[pg_extern] - fn serialize(elem: pgrx::AnyElement, elem_name: default!(Option<&str>, "NULL")) { - if let Some(elem_name) = elem_name { - pgrx::info!("\"{}\": ", elem_name); - } - - match elem.oid() { - FLOAT4OID => { - let value = unsafe { - f32::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - let float4_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::FLOAT, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![float4_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - FLOAT8OID => { - let value = unsafe { - f64::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - let float8_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::DOUBLE, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![float8_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - INT2OID => { - let value = unsafe { - i16::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - let int2_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT32, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Integer { - bit_width: 16, - is_signed: true, - })) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![int2_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value as i32], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - INT4OID => { - let value = unsafe { - i32::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - let int4_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT32, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![int4_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - INT8OID => { - let value = unsafe { - i64::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - let int8_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT64, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![int8_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - NUMERICOID => { - // todo: totally wrong - let value = unsafe { - AnyNumeric::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - let numeric_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::numeric_send, &[value.clone().into_datum()]) - .unwrap() - }; - let numeric_as_bytes: ByteArray = numeric_as_bytes.as_slice().into(); +mod pgrx_utils; - // find scale and precision - let numeric_as_str: &core::ffi::CStr = unsafe { - direct_function_call(pg_sys::numeric_out, &[value.into_datum()]).unwrap() - }; - let numeric_as_str = numeric_as_str.to_str().unwrap(); - let numeric_split: Vec<&str> = numeric_as_str.split('.').collect(); - let first_part_len = numeric_split[0].len() as i32; - let second_part_len = if numeric_split.len() > 1 { - numeric_split[1].len() as i32 - } else { - 0 - }; - let precision = first_part_len + second_part_len; - let scale = second_part_len; - - let numeric_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::BYTE_ARRAY, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Decimal { - precision, - scale, - })) - .with_precision(precision) - .with_scale(scale) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![numeric_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[numeric_as_bytes], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - DATEOID => { - let value = unsafe { - Date::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - // PG epoch is (2000-01-01). Convert it to Unix epoch (1970-01-01). +10957 days - let value: Date = unsafe { - direct_function_call( - pg_sys::date_pli, - &[value.into_datum(), 10957.into_datum()], - ) - .unwrap() - }; - - let date_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::date_send, &[value.into_datum()]).unwrap() - }; - let date_as_int = i32::from_be_bytes(date_as_bytes[0..4].try_into().unwrap()); - - let date_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT32, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Date)) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![date_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[date_as_int], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - TIMESTAMPOID => { - let value = - unsafe { Timestamp::from_polymorphic_datum(elem.datum(), false, elem.oid()) } - .unwrap(); - - // PG epoch is (2000-01-01). Convert it to Unix epoch (1970-01-01). +10957 days - let adjustment_interval = Interval::from_days(10957); - let value: Timestamp = unsafe { - direct_function_call( - pg_sys::timestamp_pl_interval, - &[value.into_datum(), adjustment_interval.into_datum()], - ) - .unwrap() - }; - - let timestamp_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::time_send, &[value.into_datum()]).unwrap() - }; - let timestamp_val = - i64::from_be_bytes(timestamp_as_bytes[0..8].try_into().unwrap()); - - let timestamp_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT64, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Timestamp { - is_adjusted_to_u_t_c: false, - unit: parquet::basic::TimeUnit::MICROS(parquet::format::MicroSeconds {}), - })) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![timestamp_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[timestamp_val], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - TIMESTAMPTZOID => { - let value = unsafe { - TimestampWithTimeZone::from_polymorphic_datum(elem.datum(), false, elem.oid()) - } - .unwrap(); - - // PG epoch is (2000-01-01). Convert it to Unix epoch (1970-01-01). +10957 days - let adjustment_interval = Interval::from_days(10957); - let value: TimestampWithTimeZone = unsafe { - direct_function_call( - pg_sys::timestamptz_pl_interval, - &[value.into_datum(), adjustment_interval.into_datum()], - ) - .unwrap() - }; - - let timestamp_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::timestamptz_send, &[value.into_datum()]).unwrap() - }; - let timestamp_val = - i64::from_be_bytes(timestamp_as_bytes[0..8].try_into().unwrap()); - - let timestamp_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT64, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Timestamp { - is_adjusted_to_u_t_c: true, - unit: parquet::basic::TimeUnit::MICROS(parquet::format::MicroSeconds {}), - })) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![timestamp_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[timestamp_val], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - TIMEOID => { - let value = - unsafe { Time::from_polymorphic_datum(elem.datum(), false, elem.oid()) } - .unwrap(); - - let time_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::time_send, &[value.into_datum()]).unwrap() - }; - let time_val = i64::from_be_bytes(time_as_bytes[0..8].try_into().unwrap()); - - let time_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT64, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Time { - is_adjusted_to_u_t_c: false, - unit: parquet::basic::TimeUnit::MICROS(parquet::format::MicroSeconds {}), - })) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![time_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[time_val], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - TIMETZOID => { - let value = unsafe { - TimeWithTimeZone::from_polymorphic_datum(elem.datum(), false, elem.oid()) - } - .unwrap(); - - // extract timezone as seconds - let timezone_as_secs: AnyNumeric = unsafe { - direct_function_call( - pg_sys::extract_timetz, - &["timezone".into_datum(), value.into_datum()], - ) - } - .unwrap(); - - // adjust timezone - let timezone_as_secs: f64 = timezone_as_secs.try_into().unwrap(); - let timezone_val = Interval::from_seconds(timezone_as_secs); - let value: TimeWithTimeZone = unsafe { - direct_function_call( - pg_sys::timetz_pl_interval, - &[value.into_datum(), timezone_val.into_datum()], - ) - .unwrap() - }; - - let time_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::timetz_send, &[value.into_datum()]).unwrap() - }; - let time_val = i64::from_be_bytes(time_as_bytes[0..8].try_into().unwrap()); - - let time_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT64, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Time { - is_adjusted_to_u_t_c: true, - unit: parquet::basic::TimeUnit::MICROS(parquet::format::MicroSeconds {}), - })) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![time_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[time_val], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - INTERVALOID => { - let value = - unsafe { Interval::from_polymorphic_datum(elem.datum(), false, elem.oid()) } - .unwrap(); - - let interval_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::interval_send, &[value.into_datum()]).unwrap() - }; - - // first 8 bytes: time in microsec - // next 4 bytes: day - // next 4 bytes: month - let time_in_microsec_val = - i64::from_be_bytes(interval_as_bytes[0..8].try_into().unwrap()); - let day_val = i32::from_be_bytes(interval_as_bytes[8..12].try_into().unwrap()); - let month_val = i32::from_be_bytes(interval_as_bytes[12..16].try_into().unwrap()); - - // Postgres interval has microsecond resolution, parquet only milliseconds - // plus postgres doesn't overflow the seconds into the day field - let ms_per_day = 1000 * 60 * 60 * 24; - let millis_total = time_in_microsec_val / 1000; - let days = millis_total / ms_per_day; - let millis = millis_total % ms_per_day; - let mut b = vec![0u8; 12]; - b[0..4].copy_from_slice(&i32::to_le_bytes(month_val)); - b[4..8].copy_from_slice(&i32::to_le_bytes(day_val + days as i32)); - b[8..12].copy_from_slice(&i32::to_le_bytes(millis as i32)); - let interval_as_bytes = FixedLenByteArray::from(b); - - let interval_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::FIXED_LEN_BYTE_ARRAY, - ) - .with_length(12) - .with_converted_type(parquet::basic::ConvertedType::INTERVAL) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![interval_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[interval_as_bytes], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - CHAROID => { - let value = - unsafe { i8::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() }; - let value: ByteArray = vec![value as u8].into(); - - let char_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::BYTE_ARRAY, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![char_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - TEXTOID | VARCHAROID | BPCHAROID => { - let value = unsafe { - String::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - let value: ByteArray = value.as_bytes().into(); - - let text_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::BYTE_ARRAY, - ) - .with_logical_type(Some(parquet::basic::LogicalType::String)) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![text_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - BOOLOID => { - let value = unsafe { - bool::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - let bool_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::INT32, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Integer { - bit_width: 8, - is_signed: true, - })) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![bool_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value as i32], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - UUIDOID => { - let value = unsafe { - Uuid::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - let uuid_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::uuid_send, &[value.into_datum()]).unwrap() - }; - let value: FixedLenByteArray = uuid_as_bytes.into(); - - let uuid_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::FIXED_LEN_BYTE_ARRAY, - ) - .with_length(16) - .with_logical_type(Some(parquet::basic::LogicalType::Uuid)) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![uuid_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - JSONOID => { - let value = unsafe { - Json::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - let json_as_bytes: Vec = unsafe { - direct_function_call(pg_sys::json_send, &[value.into_datum()]).unwrap() - }; - let value: ByteArray = json_as_bytes.into(); - - let json_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::BYTE_ARRAY, - ) - .with_logical_type(Some(parquet::basic::LogicalType::Json)) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let row_group_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![json_type.into()]) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, row_group_type.into(), Default::default()) - .unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - column_writer - .typed::() - .write_batch(&[value], None, None) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - RECORDOID => { - let record = unsafe { - PgHeapTuple::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - serialize_record(record, None); - } - FLOAT4ARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - - // write to parquet as list - let float4_type = parquet::schema::types::Type::primitive_type_builder( - elem_name.unwrap(), - parquet::basic::Type::FLOAT, - ) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let list_type = parquet::schema::types::Type::group_type_builder("list") - .with_fields(vec![float4_type.into()]) - .with_repetition(parquet::basic::Repetition::REPEATED) - .with_logical_type(Some(parquet::basic::LogicalType::List)) - .build() - .unwrap(); - - let list_type = parquet::schema::types::Type::group_type_builder("root") - .with_fields(vec![list_type.into()]) - .with_repetition(parquet::basic::Repetition::REQUIRED) - .build() - .unwrap(); - - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .open("/home/aykutbozkurt/Projects/pg_parquet/test.parquet") - .unwrap(); - - let mut writer = - SerializedFileWriter::new(file, list_type.into(), Default::default()).unwrap(); - let mut row_group_writer = writer.next_row_group().unwrap(); - let mut column_writer = row_group_writer.next_column().unwrap().unwrap(); - - let def_levels = vec![1; value.len()]; - let mut rep_levels = vec![1; value.len()]; - rep_levels[0] = 0; - column_writer - .typed::() - .write_batch(&value, Some(&def_levels), Some(&rep_levels)) - .unwrap(); - - column_writer.close().unwrap(); - row_group_writer.close().unwrap(); - writer.close().unwrap(); - } - FLOAT8ARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - pgrx::info!("{:?}", value); - } - INT2ARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - pgrx::info!("{:?}", value); - } - INT4ARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - pgrx::info!("{:?}", value); - } - INT8ARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - pgrx::info!("{:?}", value); - } - NUMERICARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()) - .unwrap() - }; - pgrx::info!("{:?}", value); - } - DATEARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()).unwrap() - }; - pgrx::info!("{:?}", value); - } - TIMESTAMPARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum(elem.datum(), false, elem.oid()) - .unwrap() - }; - pgrx::info!("{:?}", value); - } - TIMESTAMPTZARRAYOID => { - let value = unsafe { - Vec::::from_polymorphic_datum( - elem.datum(), - false, - elem.oid(), - ) - .unwrap() - }; - pgrx::info!("{:?}", value); - } - TIMEARRAYOID => { - let value = unsafe { - Vec::