diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index a53c4b2..aeaf2e3 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -6,10 +6,11 @@ ENV TZ="Europe/Istanbul" ARG PG_MAJOR=17 # install deps -RUN apt-get update && apt-get -y install build-essential libreadline-dev zlib1g-dev \ - flex bison libxml2-dev libxslt-dev libssl-dev \ - libxml2-utils xsltproc ccache pkg-config wget \ - curl lsb-release sudo nano net-tools git awscli +RUN apt-get update && apt-get -y install build-essential libreadline-dev zlib1g-dev \ + flex bison libxml2-dev libxslt-dev libssl-dev \ + libxml2-utils xsltproc ccache pkg-config wget \ + curl lsb-release ca-certificates gnupg sudo git \ + nano net-tools awscli # install Postgres RUN sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' @@ -19,6 +20,14 @@ RUN apt-get update && apt-get -y install postgresql-${PG_MAJOR}-postgis-3 \ postgresql-client-${PG_MAJOR} \ libpq-dev +# install azure-cli and azurite +RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - +RUN apt-get update && apt-get install -y nodejs +RUN curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/microsoft.gpg > /dev/null +RUN echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/trusted.gpg.d/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | tee /etc/apt/sources.list.d/azure-cli.list +RUN apt-get update && apt-get install -y azure-cli +RUN npm install -g azurite + # download and install MinIO server and client RUN wget https://dl.min.io/server/minio/release/linux-amd64/minio RUN chmod +x minio @@ -58,11 +67,3 @@ ARG PGRX_VERSION=0.12.6 RUN cargo install --locked cargo-pgrx@${PGRX_VERSION} RUN cargo pgrx init --pg${PG_MAJOR} $(which pg_config) RUN echo "shared_preload_libraries = 'pg_parquet'" >> $HOME/.pgrx/data-${PG_MAJOR}/postgresql.conf - -ENV MINIO_ROOT_USER=admin -ENV MINIO_ROOT_PASSWORD=admin123 -ENV AWS_S3_TEST_BUCKET=testbucket -ENV AWS_REGION=us-east-1 -ENV AWS_ACCESS_KEY_ID=admin -ENV AWS_SECRET_ACCESS_KEY=admin123 -ENV PG_PARQUET_TEST=true diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index efdd2d9..a81ca9e 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -15,7 +15,7 @@ ] } }, - "postStartCommand": "bash .devcontainer/scripts/setup-minio.sh", + "postStartCommand": "bash .devcontainer/scripts/setup_minio.sh && bash .devcontainer/scripts/setup_azurite.sh", "forwardPorts": [ 5432 ], diff --git a/.devcontainer/scripts/setup_azurite.sh b/.devcontainer/scripts/setup_azurite.sh new file mode 100644 index 0000000..cea6712 --- /dev/null +++ b/.devcontainer/scripts/setup_azurite.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +source setup_test_envs.sh + +nohup azurite --location /tmp/azurite-storage > /dev/null 2>&1 & + +az storage container create --name "${AZURE_TEST_CONTAINER_NAME}" --public off --connection-string "$AZURE_STORAGE_CONNECTION_STRING" diff --git a/.devcontainer/scripts/setup-minio.sh b/.devcontainer/scripts/setup_minio.sh similarity index 88% rename from .devcontainer/scripts/setup-minio.sh rename to .devcontainer/scripts/setup_minio.sh index d611b70..627e9c0 100644 --- a/.devcontainer/scripts/setup-minio.sh +++ b/.devcontainer/scripts/setup_minio.sh @@ -1,5 +1,7 @@ #!/bin/bash +source setup_test_envs.sh + nohup minio server /tmp/minio-storage > /dev/null 2>&1 & mc alias set local http://localhost:9000 $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD diff --git a/.devcontainer/scripts/setup_test_envs.sh b/.devcontainer/scripts/setup_test_envs.sh new file mode 100644 index 0000000..da04a64 --- /dev/null +++ b/.devcontainer/scripts/setup_test_envs.sh @@ -0,0 +1,15 @@ +# S3 tests +export AWS_ACCESS_KEY_ID=admin +export AWS_SECRET_ACCESS_KEY=admin123 +export AWS_REGION=us-east-1 +export AWS_S3_TEST_BUCKET=testbucket +export MINIO_ROOT_USER=admin +export MINIO_ROOT_PASSWORD=admin123 + +# Azure Blob tests +export AZURE_TEST_CONTAINER_NAME=testcontainer +export AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;" + +# Other +export PG_PARQUET_TEST=true +export RUST_TEST_THREADS=1 diff --git a/.env_sample b/.env_sample deleted file mode 100644 index c14097f..0000000 --- a/.env_sample +++ /dev/null @@ -1,5 +0,0 @@ -AWS_S3_TEST_BUCKET=testbucket -AWS_REGION=us-east-1 -AWS_ACCESS_KEY_ID=admin -AWS_SECRET_ACCESS_KEY=admin123 -PG_PARQUET_TEST=true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9c97fcc..2a0aae4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,8 +1,9 @@ name: CI lints and tests on: push: - branches: - - "*" + branches: [ "main" ] + pull_request: + branches: [ "main" ] concurrency: group: ${{ github.ref }} @@ -69,12 +70,23 @@ jobs: sudo sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update - sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config + sudo apt-get -y install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev \ + libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config \ + gnupg ca-certificates sudo apt-get -y install postgresql-${{ env.PG_MAJOR }}-postgis-3 \ postgresql-server-dev-${{ env.PG_MAJOR }} \ postgresql-client-${{ env.PG_MAJOR }} \ libpq-dev + - name: Install Azurite + run: | + curl -fsSL https://deb.nodesource.com/setup_20.x | sudo bash - + sudo apt-get update && sudo apt-get install -y nodejs + curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/microsoft.gpg > /dev/null + echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/trusted.gpg.d/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | sudo tee /etc/apt/sources.list.d/azure-cli.list + sudo apt-get update && sudo apt-get install -y azure-cli + npm install -g azurite + - name: Install MinIO run: | # Download and install MinIO server and client @@ -107,23 +119,14 @@ jobs: $(pg_config --sharedir)/extension \ /var/run/postgresql/ - # pgrx tests with runas argument ignores environment variables, so - # we read env vars from .env file in tests (https://github.com/pgcentralfoundation/pgrx/pull/1674) - touch /tmp/.env - echo AWS_ACCESS_KEY_ID=${{ env.AWS_ACCESS_KEY_ID }} >> /tmp/.env - echo AWS_SECRET_ACCESS_KEY=${{ env.AWS_SECRET_ACCESS_KEY }} >> /tmp/.env - echo AWS_S3_TEST_BUCKET=${{ env.AWS_S3_TEST_BUCKET }} >> /tmp/.env - echo AWS_REGION=${{ env.AWS_REGION }} >> /tmp/.env - echo PG_PARQUET_TEST=${{ env.PG_PARQUET_TEST }} >> /tmp/.env + # Set up test environments + source .devcontainer/scripts/setup_test_envs.sh # Start MinIO server - export MINIO_ROOT_USER=${{ env.AWS_ACCESS_KEY_ID }} - export MINIO_ROOT_PASSWORD=${{ env.AWS_SECRET_ACCESS_KEY }} - minio server /tmp/minio-storage > /dev/null 2>&1 & + bash .devcontainer/scripts/setup_minio.sh - # Set access key and create test bucket - mc alias set local http://localhost:9000 ${{ env.AWS_ACCESS_KEY_ID }} ${{ env.AWS_SECRET_ACCESS_KEY }} - aws --endpoint-url http://localhost:9000 s3 mb s3://${{ env.AWS_S3_TEST_BUCKET }} + # Start Azurite server + bash .devcontainer/scripts/setup_azurite.sh # Run tests with coverage tool source <(cargo llvm-cov show-env --export-prefix) @@ -134,13 +137,9 @@ jobs: # Stop MinIO server pkill -9 minio - env: - RUST_TEST_THREADS: 1 - AWS_ACCESS_KEY_ID: test_secret_access_key - AWS_SECRET_ACCESS_KEY: test_access_key_id - AWS_REGION: us-east-1 - AWS_S3_TEST_BUCKET: testbucket - PG_PARQUET_TEST: true + + # Stop Azurite server + pkill -9 node - name: Upload coverage report to Codecov if: ${{ env.PG_MAJOR }} == 17 diff --git a/Cargo.lock b/Cargo.lock index 63a1bfc..50d998f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -982,12 +982,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dotenvy" -version = "0.15.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" - [[package]] name = "either" version = "1.13.0" @@ -2088,7 +2082,6 @@ dependencies = [ "arrow-schema", "aws-config", "aws-credential-types", - "dotenvy", "futures", "object_store", "once_cell", diff --git a/Cargo.toml b/Cargo.toml index 60ff94e..8219a5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,8 @@ arrow = {version = "53", default-features = false} arrow-schema = {version = "53", default-features = false} aws-config = { version = "1.5", default-features = false, features = ["rustls"]} aws-credential-types = {version = "1.2", default-features = false} -dotenvy = "0.15" futures = "0.3" -object_store = {version = "0.11", default-features = false, features = ["aws"]} +object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]} once_cell = "1" parquet = {version = "53", default-features = false, features = [ "arrow", diff --git a/README.md b/README.md index b411d45..3934e5c 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,13 @@ You can call `SELECT * FROM parquet.file_metadata()` to discover file level You can call `SELECT * FROM parquet.kv_metadata()` to query custom key-value metadata of the Parquet file at given uri. ## Object Store Support -`pg_parquet` supports reading and writing Parquet files from/to `S3` object store. Only the uris with `s3://` scheme is supported. +`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores. + +> [!NOTE] +> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. +> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. + +#### S3 Storage The simplest way to configure object storage is by creating the standard `~/.aws/credentials` and `~/.aws/config` files: @@ -129,9 +135,20 @@ Alternatively, you can use the following environment variables when starting pos - `AWS_CONFIG_FILE`: an alternative location for the config file - `AWS_PROFILE`: the name of the profile from the credentials and config file (default profile name is `default`) -> [!NOTE] -> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. -> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. +Supported S3 uri formats are shown below: +- s3:// \ / \ +- s3a:// \ / \ +- https:// \.s3.amazonaws.com / \ +- https:// s3.amazonaws.com / \ / \ + +#### Azure Blob Storage + +You can use the following environment variables when starting postgres to configure the Azure Blob Storage client: +- `AZURE_STORAGE_ACCOUNT_KEY`: the storage account key of the Azure Blob +- `AZURE_STORAGE_SAS_TOKEN`: the storage SAS token for the Azure Blob + +Supported Azure Blob Storage uri formats are shown below: +- https:// \.blob.core.windows.net / \ / \ ## Copy Options `pg_parquet` supports the following options in the `COPY TO` command: diff --git a/src/arrow_parquet/uri_utils.rs b/src/arrow_parquet/uri_utils.rs index 745058f..0abf10c 100644 --- a/src/arrow_parquet/uri_utils.rs +++ b/src/arrow_parquet/uri_utils.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, sync::LazyLock}; +use std::{ + panic, + sync::{Arc, LazyLock}, +}; use arrow::datatypes::SchemaRef; use aws_config::{ @@ -9,9 +12,10 @@ use aws_config::{ use aws_credential_types::provider::ProvideCredentials; use object_store::{ aws::{AmazonS3, AmazonS3Builder}, + azure::{MicrosoftAzure, MicrosoftAzureBuilder}, local::LocalFileSystem, path::Path, - ObjectStore, + ObjectStore, ObjectStoreScheme, }; use parquet::{ arrow::{ @@ -44,64 +48,130 @@ pub(crate) static PG_BACKEND_TOKIO_RUNTIME: LazyLock = LazyLock::new(|| .unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e)) }); -fn parse_bucket_and_key(uri: &Url) -> (String, String) { - debug_assert!(uri.scheme() == "s3"); +fn parse_azure_blob_container(uri: &Url) -> Option { + let host = uri.host_str()?; - let bucket = uri - .host_str() - .unwrap_or_else(|| panic!("bucket not found in uri: {}", uri)); + // https://{account}.blob.core.windows.net/{container}/key + if host.ends_with("blob.core.windows.net") { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); - let key = uri.path(); + if !path_segments.is_empty() { + return Some(path_segments[0].to_string()); + } else { + return None; + } + } + // http://localhost:10000/{account}/{container}/key + else if is_testing() && uri.scheme() == "http" { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); + + if path_segments.len() >= 2 { + return Some(path_segments[1].to_string()); + } else { + return None; + } + } - (bucket.to_string(), key.to_string()) + None +} + +fn parse_s3_bucket(uri: &Url) -> Option { + let host = uri.host_str()?; + + // s3(a)://{bucket}/key + if uri.scheme() == "s3" || uri.scheme() == "s3a" { + return Some(host.to_string()); + } + // https://s3.amazonaws.com/{bucket}/key + else if host == "s3.amazonaws.com" { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); + if !path_segments.is_empty() { + return Some(path_segments[0].to_string()); // Bucket name is the first part of the path + } else { + return None; + } + } + // https://{bucket}.s3.amazonaws.com/key + else if host.ends_with("s3.amazonaws.com") { + let bucket_name = host.split('.').next()?; + return Some(bucket_name.to_string()); + } + + None } fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc, Path) { - if uri.scheme() == "s3" { - ensure_object_store_access_privilege(copy_from); + let (scheme, path) = + ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri)); - let (bucket_name, key) = parse_bucket_and_key(uri); + match scheme { + ObjectStoreScheme::AmazonS3 => { + ensure_object_store_access_privilege(copy_from); - let storage_container = PG_BACKEND_TOKIO_RUNTIME - .block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) }); + let bucket_name = parse_s3_bucket(uri).unwrap_or_else(|| { + panic!("failed to parse bucket name from uri: {}", uri); + }); - let location = Path::from(key); + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) }); - (storage_container, location) - } else { - debug_assert!(uri.scheme() == "file"); + (storage_container, path) + } + ObjectStoreScheme::MicrosoftAzure => { + ensure_object_store_access_privilege(copy_from); + + let container_name = parse_azure_blob_container(uri).unwrap_or_else(|| { + panic!("failed to parse container name from uri: {}", uri); + }); + + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_azure_object_store(&container_name).await) }); - ensure_local_file_access_privilege(copy_from); + (storage_container, path) + } + ObjectStoreScheme::Http if is_testing() => { + ensure_object_store_access_privilege(copy_from); + + let container_name = parse_azure_blob_container(uri).unwrap_or_else(|| { + panic!("failed to parse container name from uri: {}", uri); + }); - let uri = uri_as_string(uri); + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_azure_object_store(&container_name).await) }); - if !copy_from { - // create or overwrite the local file - std::fs::OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&uri) - .unwrap_or_else(|e| panic!("{}", e)); + (storage_container, path) } + ObjectStoreScheme::Local => { + ensure_local_file_access_privilege(copy_from); + + let uri = uri_as_string(uri); + + if !copy_from { + // create or overwrite the local file + std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&uri) + .unwrap_or_else(|e| panic!("{}", e)); + } - let storage_container = Arc::new(LocalFileSystem::new()); + let storage_container = Arc::new(LocalFileSystem::new()); - let location = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e)); + let path = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e)); - (storage_container, location) + (storage_container, path) + } + _ => { + panic!("unsupported uri {}", uri); + } } } async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { - // try loading environment vars from the .env file - dotenvy::from_path("/tmp/.env").ok(); - let mut aws_s3_builder = AmazonS3Builder::new().with_bucket_name(bucket_name); - let is_test_running = std::env::var("PG_PARQUET_TEST").is_ok(); - - if is_test_running { + if is_testing() { // use minio for testing aws_s3_builder = aws_s3_builder.with_endpoint("http://localhost:9000"); aws_s3_builder = aws_s3_builder.with_allow_http(true); @@ -146,6 +216,21 @@ async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e)) } +async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure { + let mut azure_builder = MicrosoftAzureBuilder::from_env().with_container_name(container_name); + + if is_testing() { + // use azurite for testing + azure_builder = azure_builder.with_use_emulator(true); + } + + azure_builder.build().unwrap_or_else(|e| panic!("{}", e)) +} + +fn is_testing() -> bool { + std::env::var("PG_PARQUET_TEST").is_ok() +} + pub(crate) fn parse_uri(uri: &str) -> Url { if !uri.contains("://") { // local file @@ -155,12 +240,25 @@ pub(crate) fn parse_uri(uri: &str) -> Url { let uri = Url::parse(uri).unwrap_or_else(|e| panic!("{}", e)); - if uri.scheme() != "s3" { + let (scheme, _) = + ObjectStoreScheme::parse(&uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri)); + + if scheme == ObjectStoreScheme::AmazonS3 { + parse_s3_bucket(&uri) + .unwrap_or_else(|| panic!("failed to parse bucket name from s3 uri {}", uri)); + } else if scheme == ObjectStoreScheme::MicrosoftAzure || scheme == ObjectStoreScheme::Http { + parse_azure_blob_container(&uri).unwrap_or_else(|| { + panic!( + "failed to parse container name from azure blob storage uri {}", + uri + ) + }); + } else { panic!( - "unsupported uri {}. Only local files and URIs with s3:// prefix are supported.", + "unsupported uri {}. Only Azure and S3 uris are supported.", uri ); - } + }; uri } diff --git a/src/lib.rs b/src/lib.rs index fdba58c..77e2db1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1197,24 +1197,142 @@ mod tests { } #[pg_test] - fn test_s3_object_store_from_env() { - dotenvy::from_path("/tmp/.env").unwrap(); + fn test_azure_blob_with_storage_key() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); - let test_bucket_name: String = - std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + let azure_blob_uris = [format!( + "http://localhost:10000/devstoreaccount1/{}/pg_parquet_test.parquet", + test_container_name + )]; - let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + for azure_blob_uri in azure_blob_uris { + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); - let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_azure_blob_with_wrong_storage_key() { + let wrong_account_key = String::from("FFy8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="); + std::env::set_var("AZURE_STORAGE_ACCOUNT_KEY", wrong_account_key); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let azure_blob_uris = [format!( + "http://localhost:10000/devstoreaccount1/{}/pg_parquet_test.parquet", + test_container_name + )]; + + for azure_blob_uri in azure_blob_uris { + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + } + + #[pg_test] + #[should_panic(expected = "404 Not Found")] + fn test_azure_blob_write_wrong_container() { + let azure_blob_uri = + "http://localhost:10000/devstoreaccount1/nonexistentcontainer/pg_parquet_test.parquet"; + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}';", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + fn test_azure_blob_read_write_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let read_write_sas_token = String::from("se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"); + + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_write_sas_token); + + let azure_blob_uri = format!( + "http://localhost:10000/devstoreaccount1/{}/pg_parquet_test.parquet", + test_container_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}';", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_azure_blob_read_only_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let read_only_sas_token = String::from( + "se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D", + ); + + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_only_sas_token); + + let azure_blob_uri = format!( + "http://localhost:10000/devstoreaccount1/{}/pg_parquet_test.parquet", + test_container_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}';", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "failed to parse container name")] + fn test_azure_blob_unsupported_uri() { + let fabric_azure_blob_uri = "https://ACCOUNT.dfs.fabric.microsoft.com".into(); + + let test_table = TestTable::::new("int4".into()).with_uri(fabric_azure_blob_uri); test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); test_helper(test_table); } #[pg_test] - fn test_s3_object_store_from_config_file() { - dotenvy::from_path("/tmp/.env").unwrap(); + fn test_s3_from_env() { + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uris = [ + format!("s3://{}/pg_parquet_test.parquet", test_bucket_name), + format!("s3a://{}/pg_parquet_test.parquet", test_bucket_name), + format!( + "https://s3.amazonaws.com/{}/pg_parquet_test.parquet", + test_bucket_name + ), + format!( + "https://{}.s3.amazonaws.com/pg_parquet_test.parquet", + test_bucket_name + ), + ]; + + for s3_uri in s3_uris { + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + } + #[pg_test] + fn test_s3_from_config_file() { let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -1255,6 +1373,38 @@ mod tests { test_helper(test_table); } + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_with_wrong_access_key_id() { + std::env::set_var("AWS_ACCESS_KEY_ID", "wrong_access_key_id"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_with_wrong_secret_access_key() { + std::env::set_var("AWS_SECRET_ACCESS_KEY", "wrong_secret_access_key"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + #[pg_test] #[should_panic(expected = "permission denied to COPY from a remote uri")] fn test_s3_no_read_access() { @@ -1270,8 +1420,6 @@ mod tests { // set the current user to the regular user Spi::run("SET SESSION AUTHORIZATION regular_user;").unwrap(); - dotenvy::from_path("/tmp/.env").unwrap(); - let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -1312,8 +1460,6 @@ mod tests { // set the current user to the regular user Spi::run("SET SESSION AUTHORIZATION regular_user;").unwrap(); - dotenvy::from_path("/tmp/.env").unwrap(); - let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -1341,7 +1487,7 @@ mod tests { #[pg_test] #[should_panic(expected = "404 Not Found")] - fn test_s3_object_store_write_invalid_uri() { + fn test_s3_write_wrong_bucket() { let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; let copy_to_command = format!( @@ -1353,7 +1499,7 @@ mod tests { #[pg_test] #[should_panic(expected = "404 Not Found")] - fn test_s3_object_store_read_invalid_uri() { + fn test_s3_read_wrong_bucket() { let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; let create_table_command = "CREATE TABLE test_table (a int);"; @@ -1363,6 +1509,17 @@ mod tests { Spi::run(copy_from_command.as_str()).unwrap(); } + #[pg_test] + #[should_panic(expected = "failed to parse bucket name")] + fn test_s3_unsupported_uri() { + let cloudflare_s3_uri = "https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket".into(); + + let test_table = TestTable::::new("int4".into()).with_uri(cloudflare_s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + #[pg_test] #[should_panic(expected = "unsupported uri gs://testbucket")] fn test_unsupported_uri() {