From 2feb683a10109c3d82d547122ed4613c7cc7a1e1 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Wed, 23 Oct 2024 20:46:29 +0300 Subject: [PATCH] Adds support for COPY TO/FROM Azure Blob Storage Supports following Azure Blob uri forms: - `az://{container}/key` - `azure://{container}/key` - `https://{account}.blob.core.windows.net/{container}/key` **Configuration** The simplest way to configure object storage is by creating the standard [`~/.azure/config`](https://learn.microsoft.com/en-us/cli/azure/azure-cli-configuration?view=azure-cli-latest) file: ```bash $ cat ~/.azure/config [storage] account = devstoreaccount1 key = Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== ``` Alternatively, you can use the following environment variables when starting postgres to configure the Azure Blob Storage client: - `AZURE_STORAGE_ACCOUNT`: the storage account name of the Azure Blob - `AZURE_STORAGE_KEY`: the storage key of the Azure Blob - `AZURE_STORAGE_SAS_TOKEN`: the storage SAS token for the Azure Blob - `AZURE_CONFIG_FILE`: an alternative location for the config file **Bonus** Additionally, PR supports following S3 uri forms: - `s3://{bucket}/key` - `s3a://{bucket}/key` - `https://s3.amazonaws.com/{bucket}/key` - `https://{bucket}.s3.amazonaws.com/key` Closes #50 --- .devcontainer/Dockerfile | 25 +- .devcontainer/devcontainer.json | 2 +- .devcontainer/scripts/setup_azurite.sh | 7 + .../{setup-minio.sh => setup_minio.sh} | 2 + .devcontainer/scripts/setup_test_envs.sh | 19 ++ .env_sample | 5 - .github/workflows/ci.yml | 42 ++- Cargo.lock | 39 ++- Cargo.toml | 5 +- README.md | 38 ++- src/arrow_parquet/uri_utils.rs | 217 ++++++++++++--- src/lib.rs | 263 +++++++++++++++++- 12 files changed, 560 insertions(+), 104 deletions(-) create mode 100644 .devcontainer/scripts/setup_azurite.sh rename .devcontainer/scripts/{setup-minio.sh => setup_minio.sh} (88%) create mode 100644 .devcontainer/scripts/setup_test_envs.sh delete mode 100644 .env_sample diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 3d3b802..9589833 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -6,10 +6,11 @@ ENV TZ="Europe/Istanbul" ARG PG_MAJOR=17 # install deps -RUN apt-get update && apt-get -y install build-essential libreadline-dev zlib1g-dev \ - flex bison libxml2-dev libxslt-dev libssl-dev \ - libxml2-utils xsltproc ccache pkg-config wget \ - curl lsb-release sudo nano net-tools git awscli +RUN apt-get update && apt-get -y install build-essential libreadline-dev zlib1g-dev \ + flex bison libxml2-dev libxslt-dev libssl-dev \ + libxml2-utils xsltproc ccache pkg-config wget \ + curl lsb-release ca-certificates gnupg sudo git \ + nano net-tools awscli # install Postgres RUN sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' @@ -19,6 +20,14 @@ RUN apt-get update && apt-get -y install postgresql-${PG_MAJOR}-postgis-3 \ postgresql-client-${PG_MAJOR} \ libpq-dev +# install azure-cli and azurite +RUN curl -fsSL https://deb.nodesource.com/setup_20.x | bash - +RUN apt-get update && apt-get install -y nodejs +RUN curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | tee /etc/apt/trusted.gpg.d/microsoft.gpg > /dev/null +RUN echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/trusted.gpg.d/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | tee /etc/apt/sources.list.d/azure-cli.list +RUN apt-get update && apt-get install -y azure-cli +RUN npm install -g azurite + # download and install MinIO server and client RUN wget https://dl.min.io/server/minio/release/linux-amd64/minio RUN chmod +x minio @@ -58,11 +67,3 @@ ARG PGRX_VERSION=0.12.6 RUN cargo install --locked cargo-pgrx@${PGRX_VERSION} RUN cargo pgrx init --pg${PG_MAJOR} $(which pg_config) RUN echo "shared_preload_libraries = 'pg_parquet'" >> $HOME/.pgrx/data-${PG_MAJOR}/postgresql.conf - -ENV MINIO_ROOT_USER=admin -ENV MINIO_ROOT_PASSWORD=admin123 -ENV AWS_S3_TEST_BUCKET=testbucket -ENV AWS_REGION=us-east-1 -ENV AWS_ACCESS_KEY_ID=admin -ENV AWS_SECRET_ACCESS_KEY=admin123 -ENV PG_PARQUET_TEST=true diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index efdd2d9..a81ca9e 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -15,7 +15,7 @@ ] } }, - "postStartCommand": "bash .devcontainer/scripts/setup-minio.sh", + "postStartCommand": "bash .devcontainer/scripts/setup_minio.sh && bash .devcontainer/scripts/setup_azurite.sh", "forwardPorts": [ 5432 ], diff --git a/.devcontainer/scripts/setup_azurite.sh b/.devcontainer/scripts/setup_azurite.sh new file mode 100644 index 0000000..cea6712 --- /dev/null +++ b/.devcontainer/scripts/setup_azurite.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +source setup_test_envs.sh + +nohup azurite --location /tmp/azurite-storage > /dev/null 2>&1 & + +az storage container create --name "${AZURE_TEST_CONTAINER_NAME}" --public off --connection-string "$AZURE_STORAGE_CONNECTION_STRING" diff --git a/.devcontainer/scripts/setup-minio.sh b/.devcontainer/scripts/setup_minio.sh similarity index 88% rename from .devcontainer/scripts/setup-minio.sh rename to .devcontainer/scripts/setup_minio.sh index d611b70..627e9c0 100644 --- a/.devcontainer/scripts/setup-minio.sh +++ b/.devcontainer/scripts/setup_minio.sh @@ -1,5 +1,7 @@ #!/bin/bash +source setup_test_envs.sh + nohup minio server /tmp/minio-storage > /dev/null 2>&1 & mc alias set local http://localhost:9000 $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD diff --git a/.devcontainer/scripts/setup_test_envs.sh b/.devcontainer/scripts/setup_test_envs.sh new file mode 100644 index 0000000..a47e856 --- /dev/null +++ b/.devcontainer/scripts/setup_test_envs.sh @@ -0,0 +1,19 @@ +# S3 tests +export AWS_ACCESS_KEY_ID=admin +export AWS_SECRET_ACCESS_KEY=admin123 +export AWS_REGION=us-east-1 +export AWS_S3_TEST_BUCKET=testbucket +export MINIO_ROOT_USER=admin +export MINIO_ROOT_PASSWORD=admin123 + +# Azure Blob tests +export AZURE_STORAGE_ACCOUNT=devstoreaccount1 +export AZURE_STORAGE_KEY="Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==" +export AZURE_STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;" +export AZURE_TEST_CONTAINER_NAME=testcontainer +export AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D" +export AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D" + +# Other +export PG_PARQUET_TEST=true +export RUST_TEST_THREADS=1 diff --git a/.env_sample b/.env_sample deleted file mode 100644 index c14097f..0000000 --- a/.env_sample +++ /dev/null @@ -1,5 +0,0 @@ -AWS_S3_TEST_BUCKET=testbucket -AWS_REGION=us-east-1 -AWS_ACCESS_KEY_ID=admin -AWS_SECRET_ACCESS_KEY=admin123 -PG_PARQUET_TEST=true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 403e54c..d3f2f21 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -70,12 +70,23 @@ jobs: sudo sh -c 'echo "deb https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | sudo apt-key add - sudo apt-get update - sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config + sudo apt-get -y install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev \ + libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config \ + gnupg ca-certificates sudo apt-get -y install postgresql-${{ env.PG_MAJOR }}-postgis-3 \ postgresql-server-dev-${{ env.PG_MAJOR }} \ postgresql-client-${{ env.PG_MAJOR }} \ libpq-dev + - name: Install Azurite + run: | + curl -fsSL https://deb.nodesource.com/setup_20.x | sudo bash - + sudo apt-get update && sudo apt-get install -y nodejs + curl -sL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/microsoft.gpg > /dev/null + echo "deb [arch=`dpkg --print-architecture` signed-by=/etc/apt/trusted.gpg.d/microsoft.gpg] https://packages.microsoft.com/repos/azure-cli/ `lsb_release -cs` main" | sudo tee /etc/apt/sources.list.d/azure-cli.list + sudo apt-get update && sudo apt-get install -y azure-cli + npm install -g azurite + - name: Install MinIO run: | # Download and install MinIO server and client @@ -108,23 +119,14 @@ jobs: $(pg_config --sharedir)/extension \ /var/run/postgresql/ - # pgrx tests with runas argument ignores environment variables, so - # we read env vars from .env file in tests (https://github.com/pgcentralfoundation/pgrx/pull/1674) - touch /tmp/.env - echo AWS_ACCESS_KEY_ID=${{ env.AWS_ACCESS_KEY_ID }} >> /tmp/.env - echo AWS_SECRET_ACCESS_KEY=${{ env.AWS_SECRET_ACCESS_KEY }} >> /tmp/.env - echo AWS_S3_TEST_BUCKET=${{ env.AWS_S3_TEST_BUCKET }} >> /tmp/.env - echo AWS_REGION=${{ env.AWS_REGION }} >> /tmp/.env - echo PG_PARQUET_TEST=${{ env.PG_PARQUET_TEST }} >> /tmp/.env + # Set up test environments + source .devcontainer/scripts/setup_test_envs.sh # Start MinIO server - export MINIO_ROOT_USER=${{ env.AWS_ACCESS_KEY_ID }} - export MINIO_ROOT_PASSWORD=${{ env.AWS_SECRET_ACCESS_KEY }} - minio server /tmp/minio-storage > /dev/null 2>&1 & + bash .devcontainer/scripts/setup_minio.sh - # Set access key and create test bucket - mc alias set local http://localhost:9000 ${{ env.AWS_ACCESS_KEY_ID }} ${{ env.AWS_SECRET_ACCESS_KEY }} - aws --endpoint-url http://localhost:9000 s3 mb s3://${{ env.AWS_S3_TEST_BUCKET }} + # Start Azurite server + bash .devcontainer/scripts/setup_azurite.sh # Run tests with coverage tool source <(cargo llvm-cov show-env --export-prefix) @@ -135,13 +137,9 @@ jobs: # Stop MinIO server pkill -9 minio - env: - RUST_TEST_THREADS: 1 - AWS_ACCESS_KEY_ID: test_secret_access_key - AWS_SECRET_ACCESS_KEY: test_access_key_id - AWS_REGION: us-east-1 - AWS_S3_TEST_BUCKET: testbucket - PG_PARQUET_TEST: true + + # Stop Azurite server + pkill -9 node - name: Upload coverage report to Codecov if: ${{ env.PG_MAJOR }} == 17 diff --git a/Cargo.lock b/Cargo.lock index f5f042e..e263060 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -983,10 +983,13 @@ dependencies = [ ] [[package]] -name = "dotenvy" -version = "0.15.7" +name = "dlv-list" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] [[package]] name = "either" @@ -1962,6 +1965,16 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "outref" version = "0.5.1" @@ -2088,13 +2101,14 @@ dependencies = [ "arrow-schema", "aws-config", "aws-credential-types", - "dotenvy", "futures", + "home", "object_store", "once_cell", "parquet", "pgrx", "pgrx-tests", + "rust-ini", "tokio", "url", ] @@ -2591,6 +2605,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rust-ini" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f" +dependencies = [ + "cfg-if", + "ordered-multimap", + "trim-in-place", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -3345,6 +3370,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "trim-in-place" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 09b056f..12370d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,9 +24,9 @@ arrow = {version = "53", default-features = false} arrow-schema = {version = "53", default-features = false} aws-config = { version = "1.5", default-features = false, features = ["rustls"]} aws-credential-types = {version = "1.2", default-features = false} -dotenvy = "0.15" futures = "0.3" -object_store = {version = "0.11", default-features = false, features = ["aws"]} +home = "0.5" +object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]} once_cell = "1" parquet = {version = "53", default-features = false, features = [ "arrow", @@ -38,6 +38,7 @@ parquet = {version = "53", default-features = false, features = [ "object_store", ]} pgrx = "=0.12.6" +rust-ini = "0.21" tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]} url = "2.5" diff --git a/README.md b/README.md index 7c679a1..544512e 100644 --- a/README.md +++ b/README.md @@ -155,7 +155,13 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM ``` ## Object Store Support -`pg_parquet` supports reading and writing Parquet files from/to `S3` object store. Only the uris with `s3://` scheme is supported. +`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores. + +> [!NOTE] +> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. +> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. + +#### S3 Storage The simplest way to configure object storage is by creating the standard `~/.aws/credentials` and `~/.aws/config` files: @@ -178,9 +184,33 @@ Alternatively, you can use the following environment variables when starting pos - `AWS_CONFIG_FILE`: an alternative location for the config file - `AWS_PROFILE`: the name of the profile from the credentials and config file (default profile name is `default`) -> [!NOTE] -> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user. -> Similarly, to read from an object store location, you need to grant `parquet_object_store_read` role to your current postgres user. +Supported S3 uri formats are shown below: +- s3:// \ / \ +- s3a:// \ / \ +- https:// \.s3.amazonaws.com / \ +- https:// s3.amazonaws.com / \ / \ + +#### Azure Blob Storage + +The simplest way to configure object storage is by creating the standard [`~/.azure/config`](https://learn.microsoft.com/en-us/cli/azure/azure-cli-configuration?view=azure-cli-latest) file: + +```bash +$ cat ~/.azure/config +[storage] +account = devstoreaccount1 +key = Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw== +``` + +Alternatively, you can use the following environment variables when starting postgres to configure the Azure Blob Storage client: +- `AZURE_STORAGE_ACCOUNT`: the storage account name of the Azure Blob +- `AZURE_STORAGE_KEY`: the storage key of the Azure Blob +- `AZURE_STORAGE_SAS_TOKEN`: the storage SAS token for the Azure Blob +- `AZURE_CONFIG_FILE`: an alternative location for the config file + +Supported Azure Blob Storage uri formats are shown below: +- az:// \ / \ +- azure:// \ / \ +- https:// \.blob.core.windows.net / \ / \ ## Copy Options `pg_parquet` supports the following options in the `COPY TO` command: diff --git a/src/arrow_parquet/uri_utils.rs b/src/arrow_parquet/uri_utils.rs index e1dc475..534caa1 100644 --- a/src/arrow_parquet/uri_utils.rs +++ b/src/arrow_parquet/uri_utils.rs @@ -1,4 +1,7 @@ -use std::{sync::Arc, sync::LazyLock}; +use std::{ + panic, + sync::{Arc, LazyLock}, +}; use arrow::datatypes::SchemaRef; use aws_config::{ @@ -7,11 +10,14 @@ use aws_config::{ profile::{ProfileFileCredentialsProvider, ProfileFileRegionProvider}, }; use aws_credential_types::provider::ProvideCredentials; +use home::home_dir; +use ini::Ini; use object_store::{ aws::{AmazonS3, AmazonS3Builder}, + azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder}, local::LocalFileSystem, path::Path, - ObjectStore, + ObjectStore, ObjectStoreScheme, }; use parquet::{ arrow::{ @@ -44,60 +50,106 @@ pub(crate) static PG_BACKEND_TOKIO_RUNTIME: LazyLock = LazyLock::new(|| .unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e)) }); -fn parse_bucket_and_key(uri: &Url) -> (String, String) { - debug_assert!(uri.scheme() == "s3"); +fn parse_azure_blob_container(uri: &Url) -> Option { + let host = uri.host_str()?; - let bucket = uri - .host_str() - .unwrap_or_else(|| panic!("bucket not found in uri: {}", uri)); + // az(ure)://{container}/key + if uri.scheme() == "az" || uri.scheme() == "azure" { + return Some(host.to_string()); + } + // https://{account}.blob.core.windows.net/{container}/key + else if host.ends_with("blob.core.windows.net") { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); - let key = uri.path(); + if !path_segments.is_empty() { + return Some(path_segments[0].to_string()); + } else { + return None; + } + } - (bucket.to_string(), key.to_string()) + None +} + +fn parse_s3_bucket(uri: &Url) -> Option { + let host = uri.host_str()?; + + // s3(a)://{bucket}/key + if uri.scheme() == "s3" || uri.scheme() == "s3a" { + return Some(host.to_string()); + } + // https://s3.amazonaws.com/{bucket}/key + else if host == "s3.amazonaws.com" { + let path_segments: Vec<&str> = uri.path_segments()?.collect(); + if !path_segments.is_empty() { + return Some(path_segments[0].to_string()); // Bucket name is the first part of the path + } else { + return None; + } + } + // https://{bucket}.s3.amazonaws.com/key + else if host.ends_with("s3.amazonaws.com") { + let bucket_name = host.split('.').next()?; + return Some(bucket_name.to_string()); + } + + None } fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc, Path) { - if uri.scheme() == "s3" { - let (bucket_name, key) = parse_bucket_and_key(uri); + let (scheme, path) = + ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri)); - let storage_container = PG_BACKEND_TOKIO_RUNTIME - .block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) }); + match scheme { + ObjectStoreScheme::AmazonS3 => { + let bucket_name = parse_s3_bucket(uri).unwrap_or_else(|| { + panic!("failed to parse bucket name from uri: {}", uri); + }); - let location = Path::from(key); + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) }); - (storage_container, location) - } else { - debug_assert!(uri.scheme() == "file"); - - let uri = uri_as_string(uri); - - if !copy_from { - // create or overwrite the local file - std::fs::OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&uri) - .unwrap_or_else(|e| panic!("{}", e)); + (storage_container, path) + } + ObjectStoreScheme::MicrosoftAzure => { + let container_name = parse_azure_blob_container(uri).unwrap_or_else(|| { + panic!("failed to parse container name from uri: {}", uri); + }); + + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_azure_object_store(&container_name).await) }); + + (storage_container, path) } + ObjectStoreScheme::Local => { + let uri = uri_as_string(uri); + + if !copy_from { + // create or overwrite the local file + std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(&uri) + .unwrap_or_else(|e| panic!("{}", e)); + } - let storage_container = Arc::new(LocalFileSystem::new()); + let storage_container = Arc::new(LocalFileSystem::new()); - let location = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e)); + let path = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e)); - (storage_container, location) + (storage_container, path) + } + _ => { + panic!("unsupported uri {}", uri); + } } } async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { - // try loading environment vars from the .env file - dotenvy::from_path("/tmp/.env").ok(); - let mut aws_s3_builder = AmazonS3Builder::new().with_bucket_name(bucket_name); - let is_test_running = std::env::var("PG_PARQUET_TEST").is_ok(); - - if is_test_running { + if is_testing() { // use minio for testing aws_s3_builder = aws_s3_builder.with_endpoint("http://localhost:9000"); aws_s3_builder = aws_s3_builder.with_allow_http(true); @@ -142,6 +194,78 @@ async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e)) } +async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure { + let mut azure_builder = MicrosoftAzureBuilder::new().with_container_name(container_name); + + if is_testing() { + // use azurite for testing + azure_builder = + azure_builder.with_endpoint("http://localhost:10000/devstoreaccount1".into()); + azure_builder = azure_builder.with_allow_http(true); + } + + // ~/.azure/config + let azure_config_file_path = std::env::var("AZURE_CONFIG_FILE").unwrap_or( + home_dir() + .expect("failed to get home directory") + .join(".azure") + .join("config") + .to_str() + .expect("failed to convert path to string") + .to_string(), + ); + + let azure_config_content = Ini::load_from_file(&azure_config_file_path).ok(); + + // storage account + let azure_blob_account = match std::env::var("AZURE_STORAGE_ACCOUNT") { + Ok(account) => Some(account), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("account")) + .map(|account| account.to_string()), + }; + + if let Some(azure_blob_account) = azure_blob_account { + azure_builder = azure_builder.with_account(azure_blob_account); + } + + // storage key + let azure_blob_key = match std::env::var("AZURE_STORAGE_KEY") { + Ok(key) => Some(key), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("key")) + .map(|key| key.to_string()), + }; + + if let Some(azure_blob_key) = azure_blob_key { + azure_builder = azure_builder.with_access_key(azure_blob_key); + } + + // sas token + let azure_blob_sas_token = match std::env::var("AZURE_STORAGE_SAS_TOKEN") { + Ok(token) => Some(token), + Err(_) => azure_config_content + .as_ref() + .and_then(|ini| ini.section(Some("storage"))) + .and_then(|section| section.get("sas_token")) + .map(|token| token.to_string()), + }; + + if let Some(azure_blob_sas_token) = azure_blob_sas_token { + azure_builder = azure_builder.with_config(AzureConfigKey::SasKey, azure_blob_sas_token); + } + + azure_builder.build().unwrap_or_else(|e| panic!("{}", e)) +} + +fn is_testing() -> bool { + std::env::var("PG_PARQUET_TEST").is_ok() +} + pub(crate) fn parse_uri(uri: &str) -> Url { if !uri.contains("://") { // local file @@ -151,12 +275,25 @@ pub(crate) fn parse_uri(uri: &str) -> Url { let uri = Url::parse(uri).unwrap_or_else(|e| panic!("{}", e)); - if uri.scheme() != "s3" { + let (scheme, _) = + ObjectStoreScheme::parse(&uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri)); + + if scheme == ObjectStoreScheme::AmazonS3 { + parse_s3_bucket(&uri) + .unwrap_or_else(|| panic!("failed to parse bucket name from s3 uri {}", uri)); + } else if scheme == ObjectStoreScheme::MicrosoftAzure { + parse_azure_blob_container(&uri).unwrap_or_else(|| { + panic!( + "failed to parse container name from azure blob storage uri {}", + uri + ) + }); + } else { panic!( - "unsupported uri {}. Only local files and URIs with s3:// prefix are supported.", + "unsupported uri {}. Only Azure and S3 uris are supported.", uri ); - } + }; uri } diff --git a/src/lib.rs b/src/lib.rs index daaf057..b9272c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1197,24 +1197,220 @@ mod tests { } #[pg_test] - fn test_s3_object_store_from_env() { - dotenvy::from_path("/tmp/.env").unwrap(); + fn test_azure_blob_from_env() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uris = [ + format!("az://{}/pg_parquet_test.parquet", test_container_name), + format!("azure://{}/pg_parquet_test.parquet", test_container_name), + format!( + "https://{}.blob.core.windows.net/{}/pg_parquet_test.parquet", + test_account_name, test_container_name + ), + ]; - let test_bucket_name: String = - std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + for azure_blob_uri in azure_blob_uris { + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); - let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + } - let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + #[pg_test] + fn test_azure_from_config_file() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + // remove these to make sure the config file is used + let account_name = std::env::var("AZURE_STORAGE_ACCOUNT").unwrap(); + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + let account_key = std::env::var("AZURE_STORAGE_KEY").unwrap(); + std::env::remove_var("AZURE_STORAGE_KEY"); + + // create a config file + let azure_config_file_content = format!( + "[storage]\naccount = {}\nkey = {}\n", + account_name, account_key + ); + + let azure_config_file = "/tmp/azure_config"; + std::env::set_var("AZURE_CONFIG_FILE", azure_config_file); + + let mut azure_config_file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(azure_config_file) + .unwrap(); + + azure_config_file + .write_all(azure_config_file_content.as_bytes()) + .unwrap(); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + + #[pg_test] + #[should_panic(expected = "Account must be specified")] + fn test_azure_with_no_storage_account() { + std::env::remove_var("AZURE_STORAGE_ACCOUNT"); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let azure_blob_uri = format!("az://{}/pg_parquet_test.parquet", test_container_name); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_azure_blob_with_wrong_storage_key() { + let wrong_account_key = String::from("FFy8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="); + std::env::set_var("AZURE_STORAGE_KEY", wrong_account_key); + + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}/pg_parquet_test.parquet", + test_account_name, test_container_name + ); + + let test_table = TestTable::::new("int4".into()).with_uri(azure_blob_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + + #[pg_test] + #[should_panic(expected = "404 Not Found")] + fn test_azure_blob_write_wrong_container() { + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/nonexistentcontainer/pg_parquet_test.parquet", + test_account_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}';", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + fn test_azure_blob_read_write_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let read_write_sas_token = std::env::var("AZURE_TEST_READ_WRITE_SAS") + .expect("AZURE_TEST_READ_WRITE_SAS not found"); + + // remove account key to make sure the sas token is used + std::env::remove_var("AZURE_STORAGE_KEY"); + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_write_sas_token); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}/pg_parquet_test.parquet", + test_account_name, test_container_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}';", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_azure_blob_read_only_sas() { + let test_container_name: String = std::env::var("AZURE_TEST_CONTAINER_NAME") + .expect("AZURE_TEST_CONTAINER_NAME not found"); + + let test_account_name: String = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("AZURE_STORAGE_ACCOUNT not found"); + + let read_only_sas_token: String = + std::env::var("AZURE_TEST_READ_ONLY_SAS").expect("AZURE_TEST_READ_ONLY_SAS not found"); + + // remove account key to make sure the sas token is used + std::env::remove_var("AZURE_STORAGE_KEY"); + std::env::set_var("AZURE_STORAGE_SAS_TOKEN", read_only_sas_token); + + let azure_blob_uri = format!( + "https://{}.blob.core.windows.net/{}/pg_parquet_test.parquet", + test_account_name, test_container_name + ); + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}';", + azure_blob_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "failed to parse container name")] + fn test_azure_blob_unsupported_uri() { + let fabric_azure_blob_uri = "https://ACCOUNT.dfs.fabric.microsoft.com".into(); + + let test_table = TestTable::::new("int4".into()).with_uri(fabric_azure_blob_uri); test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); test_helper(test_table); } #[pg_test] - fn test_s3_object_store_from_config_file() { - dotenvy::from_path("/tmp/.env").unwrap(); + fn test_s3_from_env() { + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uris = [ + format!("s3://{}/pg_parquet_test.parquet", test_bucket_name), + format!("s3a://{}/pg_parquet_test.parquet", test_bucket_name), + format!( + "https://s3.amazonaws.com/{}/pg_parquet_test.parquet", + test_bucket_name + ), + format!( + "https://{}.s3.amazonaws.com/pg_parquet_test.parquet", + test_bucket_name + ), + ]; + for s3_uri in s3_uris { + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + } + + #[pg_test] + fn test_s3_from_config_file() { let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -1255,6 +1451,38 @@ mod tests { test_helper(test_table); } + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_with_wrong_access_key_id() { + std::env::set_var("AWS_ACCESS_KEY_ID", "wrong_access_key_id"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + + #[pg_test] + #[should_panic(expected = "403 Forbidden")] + fn test_s3_with_wrong_secret_access_key() { + std::env::set_var("AWS_SECRET_ACCESS_KEY", "wrong_secret_access_key"); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + #[pg_test] #[should_panic(expected = "permission denied to COPY from a remote uri")] fn test_s3_no_read_access() { @@ -1270,8 +1498,6 @@ mod tests { // set the current user to the regular user Spi::run("SET SESSION AUTHORIZATION regular_user;").unwrap(); - dotenvy::from_path("/tmp/.env").unwrap(); - let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -1312,8 +1538,6 @@ mod tests { // set the current user to the regular user Spi::run("SET SESSION AUTHORIZATION regular_user;").unwrap(); - dotenvy::from_path("/tmp/.env").unwrap(); - let test_bucket_name: String = std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); @@ -1341,7 +1565,7 @@ mod tests { #[pg_test] #[should_panic(expected = "404 Not Found")] - fn test_s3_object_store_write_invalid_uri() { + fn test_s3_write_wrong_bucket() { let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; let copy_to_command = format!( @@ -1353,7 +1577,7 @@ mod tests { #[pg_test] #[should_panic(expected = "404 Not Found")] - fn test_s3_object_store_read_invalid_uri() { + fn test_s3_read_wrong_bucket() { let s3_uri = "s3://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; let create_table_command = "CREATE TABLE test_table (a int);"; @@ -1363,6 +1587,17 @@ mod tests { Spi::run(copy_from_command.as_str()).unwrap(); } + #[pg_test] + #[should_panic(expected = "failed to parse bucket name")] + fn test_s3_unsupported_uri() { + let cloudflare_s3_uri = "https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket".into(); + + let test_table = TestTable::::new("int4".into()).with_uri(cloudflare_s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + #[pg_test] #[should_panic(expected = "unsupported uri gs://testbucket")] fn test_unsupported_uri() {