From f187bc41ef791cef822c65d9ef500f675a863c39 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Wed, 23 Oct 2024 20:46:29 +0300 Subject: [PATCH] Adds support for COPY TO/FROM Google Cloud Storage Supports following Google Cloud Storage uri forms: - gs:// \ / \ **Configuration** The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]: ```bash $ cat /tmp/gcs.json { "gcs_base_url": "gs://testbucket/test.parquet", "disable_oauth": false, "client_email": "...", "private_key_id": "...", "private_key": "..." } ``` Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client: - `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key - `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file Closes #62 --- .devcontainer/.env | 3 ++ .devcontainer/create-test-buckets.sh | 2 ++ .devcontainer/docker-compose.yml | 15 ++++++++ .github/workflows/ci.yml | 11 ++++++ .vscode/settings.json | 4 +-- Cargo.lock | 1 + Cargo.toml | 2 +- README.md | 22 ++++++++++++ src/arrow_parquet/uri_utils.rs | 52 ++++++++++++++++++++++++++-- src/pgrx_tests/object_store.rs | 45 +++++++++++++++++++++--- 10 files changed, 148 insertions(+), 9 deletions(-) diff --git a/.devcontainer/.env b/.devcontainer/.env index d94153e..ebc69c1 100644 --- a/.devcontainer/.env +++ b/.devcontainer/.env @@ -14,6 +14,9 @@ AZURE_TEST_CONTAINER_NAME=testcontainer AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D" AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D" +# GCS tests +GOOGLE_TEST_BUCKET=testbucket + # Others RUST_TEST_THREADS=1 PG_PARQUET_TEST=true diff --git a/.devcontainer/create-test-buckets.sh b/.devcontainer/create-test-buckets.sh index 9ad1360..4c45e61 100644 --- a/.devcontainer/create-test-buckets.sh +++ b/.devcontainer/create-test-buckets.sh @@ -3,3 +3,5 @@ aws --endpoint-url http://localhost:9000 s3 mb s3://$AWS_S3_TEST_BUCKET az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING + +curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml index 5147410..321f19f 100644 --- a/.devcontainer/docker-compose.yml +++ b/.devcontainer/docker-compose.yml @@ -12,6 +12,7 @@ services: - ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro - ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw - ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw + - ${USERPROFILE}${HOME}/.config/gcloud:/home/rust/.config/gcloud:rw env_file: - .env @@ -20,6 +21,7 @@ services: depends_on: - minio - azurite + - fake-gcs-server minio: image: minio/minio @@ -45,3 +47,16 @@ services: interval: 6s timeout: 2s retries: 3 + + fake-gcs-server: + image: tustvold/fake-gcs-server + env_file: + - .env + network_mode: host + command: -scheme http -public-host localhost:4443 + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "http://localhost:4443"] + interval: 6s + timeout: 2s + retries: 3 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 723c037..46625b6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,6 +132,17 @@ jobs: az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING + - name: Start fake-gcs-server for Google Cloud Storage emulator tests + run: | + docker run -d --env-file .devcontainer/.env -p 4443:4443 tustvold/fake-gcs-server -scheme http -filesystem-root /tmp/gcs -public-host localhost:4443 + + while ! nc -z localhost 4443; do + echo "Waiting for localhost:4443..." + sleep 1 + done + + curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b" + - name: Run tests run: | # Run tests with coverage tool diff --git a/.vscode/settings.json b/.vscode/settings.json index f6ad919..f90c4ab 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,6 +5,6 @@ "rust-analyzer.checkOnSave": true, "editor.inlayHints.enabled": "offUnlessPressed", "files.watcherExclude": { - "**/target/**": true - } + "**/target/**": true + } } diff --git a/Cargo.lock b/Cargo.lock index b547a37..7305067 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2093,6 +2093,7 @@ dependencies = [ "rand", "reqwest", "ring", + "rustls-pemfile 2.2.0", "serde", "serde_json", "snafu", diff --git a/Cargo.toml b/Cargo.toml index d71a000..87856e3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ aws-config = { version = "1.5", default-features = false, features = ["rustls"]} aws-credential-types = {version = "1.2", default-features = false} futures = "0.3" home = "0.5" -object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]} +object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]} once_cell = "1" parquet = {version = "53", default-features = false, features = [ "arrow", diff --git a/README.md b/README.md index 2060589..4f3b681 100644 --- a/README.md +++ b/README.md @@ -213,6 +213,28 @@ Supported Azure Blob Storage uri formats are shown below: - azure:// \ / \ - https:// \.blob.core.windows.net / \ +#### Google Cloud Storage + +The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]: + +```bash +$ cat /tmp/gcs.json +{ + "gcs_base_url": "http://localhost:4443", + "disable_oauth": true, + "client_email": "", + "private_key_id": "", + "private_key": "" +} +``` + +Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client: +- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key +- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file + +Supported Google Cloud Storage uri formats are shown below: +- gs:// \ / \ + ## Copy Options `pg_parquet` supports the following options in the `COPY TO` command: - `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.]` extension, diff --git a/src/arrow_parquet/uri_utils.rs b/src/arrow_parquet/uri_utils.rs index 438bc35..9f4dd72 100644 --- a/src/arrow_parquet/uri_utils.rs +++ b/src/arrow_parquet/uri_utils.rs @@ -15,6 +15,7 @@ use ini::Ini; use object_store::{ aws::{AmazonS3, AmazonS3Builder}, azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder}, + gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}, local::LocalFileSystem, path::Path, ObjectStore, ObjectStoreScheme, @@ -96,6 +97,17 @@ fn parse_s3_bucket(uri: &Url) -> Option { None } +fn parse_gcs_bucket(uri: &Url) -> Option { + let host = uri.host_str()?; + + // gs://{bucket}/key + if uri.scheme() == "gs" { + return Some(host.to_string()); + } + + None +} + fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc, Path) { let (scheme, path) = ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri)); @@ -121,6 +133,16 @@ fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc { + let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| { + panic!("failed to parse bucket name from uri: {}", uri); + }); + + let storage_container = PG_BACKEND_TOKIO_RUNTIME + .block_on(async { Arc::new(get_gcs_object_store(&bucket_name).await) }); + + (storage_container, path) + } ObjectStoreScheme::Local => { let uri = uri_as_string(uri); @@ -262,6 +284,25 @@ async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure { azure_builder.build().unwrap_or_else(|e| panic!("{}", e)) } +async fn get_gcs_object_store(bucket_name: &str) -> GoogleCloudStorage { + let mut gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name); + + if is_testing() { + // use fake-gcp-server for testing + gcs_builder = gcs_builder.with_service_account_key( + "{ + \"gcs_base_url\": \"http://localhost:4443\", + \"disable_oauth\": true, + \"client_email\": \"\", + \"private_key_id\": \"\", + \"private_key\": \"\" + }", + ); + } + + gcs_builder.build().unwrap_or_else(|e| panic!("{}", e)) +} + fn is_testing() -> bool { std::env::var("PG_PARQUET_TEST").is_ok() } @@ -284,13 +325,20 @@ pub(crate) fn parse_uri(uri: &str) -> Url { } else if scheme == ObjectStoreScheme::MicrosoftAzure { parse_azure_blob_container(&uri).unwrap_or_else(|| { panic!( - "failed to parse container name from azure blob storage uri {}", + "failed to parse container name from Azure Blob Storage uri {}", + uri + ) + }); + } else if scheme == ObjectStoreScheme::GoogleCloudStorage { + parse_gcs_bucket(&uri).unwrap_or_else(|| { + panic!( + "failed to parse bucket name from Google Cloud Storage uri {}", uri ) }); } else { panic!( - "unsupported uri {}. Only Azure and S3 uris are supported.", + "unsupported uri {}. Only Azure Blob Storage, S3 and Google Cloud Storage uris are supported.", uri ); }; diff --git a/src/pgrx_tests/object_store.rs b/src/pgrx_tests/object_store.rs index 89edffe..2e985f6 100644 --- a/src/pgrx_tests/object_store.rs +++ b/src/pgrx_tests/object_store.rs @@ -338,7 +338,7 @@ mod tests { ); let copy_to_command = format!( - "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;", + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);", azure_blob_uri ); Spi::run(copy_to_command.as_str()).unwrap(); @@ -365,7 +365,7 @@ mod tests { ); let copy_to_command = format!( - "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;", + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);", azure_blob_uri ); Spi::run(copy_to_command.as_str()).unwrap(); @@ -411,10 +411,47 @@ mod tests { } #[pg_test] - #[should_panic(expected = "unsupported uri gs://testbucket")] + fn test_gcs_from_env() { + let test_bucket_name: String = + std::env::var("GOOGLE_TEST_BUCKET").expect("GOOGLE_TEST_BUCKET not found"); + + let gcs_uri = format!("gs://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(gcs_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_table.assert_expected_and_result_rows(); + } + + #[pg_test] + #[should_panic(expected = "404 Not Found")] + fn test_gcs_write_wrong_bucket() { + let s3_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; + + let copy_to_command = format!( + "COPY (SELECT i FROM generate_series(1,10) i) TO '{}';", + s3_uri + ); + Spi::run(copy_to_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "404 Not Found")] + fn test_gcs_read_wrong_bucket() { + let gcs_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet"; + + let create_table_command = "CREATE TABLE test_table (a int);"; + Spi::run(create_table_command).unwrap(); + + let copy_from_command = format!("COPY test_table FROM '{}';", gcs_uri); + Spi::run(copy_from_command.as_str()).unwrap(); + } + + #[pg_test] + #[should_panic(expected = "unsupported uri http://testbucket")] fn test_unsupported_uri() { let test_table = - TestTable::::new("int4".into()).with_uri("gs://testbucket".to_string()); + TestTable::::new("int4".into()).with_uri("http://testbucket".to_string()); test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); test_table.assert_expected_and_result_rows(); }