Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Support for COPY TO/FROM Google Cloud Storage #61

Draft
wants to merge 7 commits into
base: aykut/azure-blob-storage
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,10 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"

# GCS tests
GOOGLE_TEST_BUCKET=testbucket
GOOGLE_SERVICE_ACCOUNT_KEY='{"gcs_base_url": "http://localhost:4443","disable_oauth": true,"client_email": "","private_key_id": "","private_key": ""}'
GOOGLE_SERVICE_ENDPOINT=http://localhost:4443

# Others
RUST_TEST_THREADS=1
15 changes: 15 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro
- ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw
- ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw
- ${USERPROFILE}${HOME}/.config/gcloud:/home/rust/.config/gcloud:rw
- ./entrypoint.sh:/entrypoint.sh
env_file:
- .env
Expand All @@ -20,6 +21,7 @@ services:
depends_on:
- minio
- azurite
- fake-gcs-server

minio:
image: minio/minio
Expand Down Expand Up @@ -47,3 +49,16 @@ services:
interval: 6s
timeout: 2s
retries: 3

fake-gcs-server:
image: tustvold/fake-gcs-server
env_file:
- .env
network_mode: host
command: -scheme http -public-host localhost:4443
restart: unless-stopped
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "4443"]
interval: 6s
timeout: 2s
retries: 3
3 changes: 3 additions & 0 deletions .devcontainer/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ trap "echo 'Caught termination signal. Exiting...'; exit 0" SIGINT SIGTERM
# create azurite container
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING

# create fake-gcs bucket
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"

sleep infinity
14 changes: 14 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ jobs:
# create container
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING

- name: Start fake-gcs-server for Google Cloud Storage emulator tests
run: |
docker run -d \
--env-file .devcontainer/.env \
-p 4443:4443 \
tustvold/fake-gcs-server

while ! curl $GOOGLE_SERVICE_ENDPOINT; do
echo "Waiting for $GOOGLE_SERVICE_ENDPOINT..."
sleep 1
done

curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "$GOOGLE_SERVICE_ENDPOINT/storage/v1/b"

- name: Run tests
run: |
# Run tests with coverage tool
Expand Down
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"rust-analyzer.checkOnSave": true,
"editor.inlayHints.enabled": "offUnlessPressed",
"files.watcherExclude": {
"**/target/**": true
}
"**/target/**": true
}
}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ aws-credential-types = {version = "1", default-features = false}
aws-sdk-sts = "1"
futures = "0.3"
home = "0.5"
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]}
once_cell = "1"
parquet = {version = "53", default-features = false, features = [
"arrow",
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ SELECT uri, encode(key, 'escape') as key, encode(value, 'escape') as value FROM
```

## Object Store Support
`pg_parquet` supports reading and writing Parquet files from/to `S3` and `Azure Blob Storage` object stores.
`pg_parquet` supports reading and writing Parquet files from/to `S3`, `Azure Blob Storage` and `Google Cloud Service` object stores.

> [!NOTE]
> To be able to write into a object store location, you need to grant `parquet_object_store_write` role to your current postgres user.
Expand Down Expand Up @@ -219,6 +219,28 @@ Supported Azure Blob Storage uri formats are shown below:
- azure:// \<container\> / \<path\>
- https:// \<account\>.blob.core.windows.net / \<container\>

#### Google Cloud Storage

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
"gcs_base_url": "http://localhost:4443",
"disable_oauth": true,
"client_email": "",
"private_key_id": "",
"private_key": ""
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key **(only via environment variables)**
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file **(only via environment variables)**

Supported Google Cloud Storage uri formats are shown below:
- gs:// \<bucket\> / \<path\>

## Copy Options
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
Expand Down
40 changes: 38 additions & 2 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use ini::Ini;
use object_store::{
aws::{AmazonS3, AmazonS3Builder},
azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder},
gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
local::LocalFileSystem,
path::Path,
ObjectStore, ObjectStoreScheme,
Expand Down Expand Up @@ -92,6 +93,17 @@ fn parse_s3_bucket(uri: &Url) -> Option<String> {
None
}

fn parse_gcs_bucket(uri: &Url) -> Option<String> {
let host = uri.host_str()?;

// gs://{bucket}/key
if uri.scheme() == "gs" {
return Some(host.to_string());
}

None
}

fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
let (scheme, path) =
ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri));
Expand All @@ -117,6 +129,16 @@ fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStor

(storage_container, path)
}
ObjectStoreScheme::GoogleCloudStorage => {
let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| {
panic!("failed to parse bucket name from uri: {}", uri);
});

let storage_container = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { Arc::new(get_gcs_object_store(&bucket_name).await) });

(storage_container, path)
}
ObjectStoreScheme::Local => {
let uri = uri_as_string(uri);

Expand Down Expand Up @@ -245,6 +267,13 @@ async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure {
azure_builder.build().unwrap_or_else(|e| panic!("{}", e))
}

async fn get_gcs_object_store(bucket_name: &str) -> GoogleCloudStorage {
GoogleCloudStorageBuilder::from_env()
.with_bucket_name(bucket_name)
.build()
.unwrap_or_else(|e| panic!("{}", e))
}

pub(crate) fn parse_uri(uri: &str) -> Url {
if !uri.contains("://") {
// local file
Expand All @@ -263,13 +292,20 @@ pub(crate) fn parse_uri(uri: &str) -> Url {
} else if scheme == ObjectStoreScheme::MicrosoftAzure {
parse_azure_blob_container(&uri).unwrap_or_else(|| {
panic!(
"failed to parse container name from azure blob storage uri {}",
"failed to parse container name from Azure Blob Storage uri {}",
uri
)
});
} else if scheme == ObjectStoreScheme::GoogleCloudStorage {
parse_gcs_bucket(&uri).unwrap_or_else(|| {
panic!(
"failed to parse bucket name from Google Cloud Storage uri {}",
uri
)
});
} else {
panic!(
"unsupported uri {}. Only Azure and S3 uris are supported.",
"unsupported uri {}. Only Azure Blob Storage, S3 and Google Cloud Storage uris are supported.",
uri
);
};
Expand Down
45 changes: 41 additions & 4 deletions src/pgrx_tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
Expand All @@ -443,7 +443,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
Expand Down Expand Up @@ -489,10 +489,47 @@ mod tests {
}

#[pg_test]
#[should_panic(expected = "unsupported uri gs://testbucket")]
fn test_gcs_from_env() {
let test_bucket_name: String =
std::env::var("GOOGLE_TEST_BUCKET").expect("GOOGLE_TEST_BUCKET not found");

let gcs_uri = format!("gs://{}/pg_parquet_test.parquet", test_bucket_name);

let test_table = TestTable::<i32>::new("int4".into()).with_uri(gcs_uri);

test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_write_wrong_bucket() {
let s3_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}';",
s3_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_read_wrong_bucket() {
let gcs_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let create_table_command = "CREATE TABLE test_table (a int);";
Spi::run(create_table_command).unwrap();

let copy_from_command = format!("COPY test_table FROM '{}';", gcs_uri);
Spi::run(copy_from_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "unsupported uri http://testbucket")]
fn test_unsupported_uri() {
let test_table =
TestTable::<i32>::new("int4".into()).with_uri("gs://testbucket".to_string());
TestTable::<i32>::new("int4".into()).with_uri("http://testbucket".to_string());
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}
Expand Down