Skip to content

Commit

Permalink
Adds support for COPY TO/FROM Google Cloud Storage
Browse files Browse the repository at this point in the history
Supports following Google Cloud Storage uri forms:
- gs:// \<bucket\> / \<path\>

**Configuration**

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
  "gcs_base_url": "gs://testbucket/test.parquet",
  "disable_oauth": false,
  "client_email": "...",
  "private_key_id": "...",
  "private_key": "..."
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file

Closes #62
  • Loading branch information
aykut-bozkurt committed Dec 3, 2024
1 parent 8553677 commit f187bc4
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"

# GCS tests
GOOGLE_TEST_BUCKET=testbucket

# Others
RUST_TEST_THREADS=1
PG_PARQUET_TEST=true
2 changes: 2 additions & 0 deletions .devcontainer/create-test-buckets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
aws --endpoint-url http://localhost:9000 s3 mb s3://$AWS_S3_TEST_BUCKET

az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING

curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
15 changes: 15 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ services:
- ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro
- ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw
- ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw
- ${USERPROFILE}${HOME}/.config/gcloud:/home/rust/.config/gcloud:rw

env_file:
- .env
Expand All @@ -20,6 +21,7 @@ services:
depends_on:
- minio
- azurite
- fake-gcs-server

minio:
image: minio/minio
Expand All @@ -45,3 +47,16 @@ services:
interval: 6s
timeout: 2s
retries: 3

fake-gcs-server:
image: tustvold/fake-gcs-server
env_file:
- .env
network_mode: host
command: -scheme http -public-host localhost:4443
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "http://localhost:4443"]
interval: 6s
timeout: 2s
retries: 3
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ jobs:
az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING
- name: Start fake-gcs-server for Google Cloud Storage emulator tests
run: |
docker run -d --env-file .devcontainer/.env -p 4443:4443 tustvold/fake-gcs-server -scheme http -filesystem-root /tmp/gcs -public-host localhost:4443
while ! nc -z localhost 4443; do
echo "Waiting for localhost:4443..."
sleep 1
done
curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
- name: Run tests
run: |
# Run tests with coverage tool
Expand Down
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"rust-analyzer.checkOnSave": true,
"editor.inlayHints.enabled": "offUnlessPressed",
"files.watcherExclude": {
"**/target/**": true
}
"**/target/**": true
}
}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ aws-config = { version = "1.5", default-features = false, features = ["rustls"]}
aws-credential-types = {version = "1.2", default-features = false}
futures = "0.3"
home = "0.5"
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]}
once_cell = "1"
parquet = {version = "53", default-features = false, features = [
"arrow",
Expand Down
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,28 @@ Supported Azure Blob Storage uri formats are shown below:
- azure:// \<container\> / \<path\>
- https:// \<account\>.blob.core.windows.net / \<container\>

#### Google Cloud Storage

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
"gcs_base_url": "http://localhost:4443",
"disable_oauth": true,
"client_email": "",
"private_key_id": "",
"private_key": ""
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file

Supported Google Cloud Storage uri formats are shown below:
- gs:// \<bucket\> / \<path\>

## Copy Options
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
Expand Down
52 changes: 50 additions & 2 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use ini::Ini;
use object_store::{
aws::{AmazonS3, AmazonS3Builder},
azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder},
gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
local::LocalFileSystem,
path::Path,
ObjectStore, ObjectStoreScheme,
Expand Down Expand Up @@ -96,6 +97,17 @@ fn parse_s3_bucket(uri: &Url) -> Option<String> {
None
}

fn parse_gcs_bucket(uri: &Url) -> Option<String> {
let host = uri.host_str()?;

// gs://{bucket}/key
if uri.scheme() == "gs" {
return Some(host.to_string());
}

None
}

fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
let (scheme, path) =
ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri));
Expand All @@ -121,6 +133,16 @@ fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStor

(storage_container, path)
}
ObjectStoreScheme::GoogleCloudStorage => {
let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| {
panic!("failed to parse bucket name from uri: {}", uri);
});

let storage_container = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { Arc::new(get_gcs_object_store(&bucket_name).await) });

(storage_container, path)
}
ObjectStoreScheme::Local => {
let uri = uri_as_string(uri);

Expand Down Expand Up @@ -262,6 +284,25 @@ async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure {
azure_builder.build().unwrap_or_else(|e| panic!("{}", e))
}

async fn get_gcs_object_store(bucket_name: &str) -> GoogleCloudStorage {
let mut gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);

if is_testing() {
// use fake-gcp-server for testing
gcs_builder = gcs_builder.with_service_account_key(
"{
\"gcs_base_url\": \"http://localhost:4443\",
\"disable_oauth\": true,
\"client_email\": \"\",
\"private_key_id\": \"\",
\"private_key\": \"\"
}",
);
}

gcs_builder.build().unwrap_or_else(|e| panic!("{}", e))
}

fn is_testing() -> bool {
std::env::var("PG_PARQUET_TEST").is_ok()
}
Expand All @@ -284,13 +325,20 @@ pub(crate) fn parse_uri(uri: &str) -> Url {
} else if scheme == ObjectStoreScheme::MicrosoftAzure {
parse_azure_blob_container(&uri).unwrap_or_else(|| {
panic!(
"failed to parse container name from azure blob storage uri {}",
"failed to parse container name from Azure Blob Storage uri {}",
uri
)
});
} else if scheme == ObjectStoreScheme::GoogleCloudStorage {
parse_gcs_bucket(&uri).unwrap_or_else(|| {
panic!(
"failed to parse bucket name from Google Cloud Storage uri {}",
uri
)
});
} else {
panic!(
"unsupported uri {}. Only Azure and S3 uris are supported.",
"unsupported uri {}. Only Azure Blob Storage, S3 and Google Cloud Storage uris are supported.",
uri
);
};
Expand Down
45 changes: 41 additions & 4 deletions src/pgrx_tests/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
Expand All @@ -365,7 +365,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
Expand Down Expand Up @@ -411,10 +411,47 @@ mod tests {
}

#[pg_test]
#[should_panic(expected = "unsupported uri gs://testbucket")]
fn test_gcs_from_env() {
let test_bucket_name: String =
std::env::var("GOOGLE_TEST_BUCKET").expect("GOOGLE_TEST_BUCKET not found");

let gcs_uri = format!("gs://{}/pg_parquet_test.parquet", test_bucket_name);

let test_table = TestTable::<i32>::new("int4".into()).with_uri(gcs_uri);

test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_write_wrong_bucket() {
let s3_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}';",
s3_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_read_wrong_bucket() {
let gcs_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let create_table_command = "CREATE TABLE test_table (a int);";
Spi::run(create_table_command).unwrap();

let copy_from_command = format!("COPY test_table FROM '{}';", gcs_uri);
Spi::run(copy_from_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "unsupported uri http://testbucket")]
fn test_unsupported_uri() {
let test_table =
TestTable::<i32>::new("int4".into()).with_uri("gs://testbucket".to_string());
TestTable::<i32>::new("int4".into()).with_uri("http://testbucket".to_string());
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}
Expand Down

0 comments on commit f187bc4

Please sign in to comment.