From 7e62fa44ae3f0cf7bf6c3ae5092867ab495649d8 Mon Sep 17 00:00:00 2001 From: Aykut Bozkurt Date: Tue, 24 Sep 2024 18:42:33 +0300 Subject: [PATCH] support reading s3 config from config file --- .env_sample | 5 ++ .github/workflows/ci.yml | 50 +++++++++++++++-- Cargo.toml | 2 + src/arrow_parquet/uri_utils.rs | 98 +++++++++++++++++++++++++++------- src/lib.rs | 45 +++++++++++++++- 5 files changed, 174 insertions(+), 26 deletions(-) create mode 100644 .env_sample diff --git a/.env_sample b/.env_sample new file mode 100644 index 0000000..8c3ff32 --- /dev/null +++ b/.env_sample @@ -0,0 +1,5 @@ +AWS_S3_TEST_BUCKET=testbucket +AWS_REGION=us-east-1 +AWS_SECRET_ACCESS_KEY=admin +AWS_ACCESS_KEY_ID=admin123 +PG_PARQUET_TEST=true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9d3d14e..0656eb5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,6 +8,13 @@ concurrency: group: ${{ github.ref }} cancel-in-progress: true +env: + AWS_ACCESS_KEY_ID: test_secret_access_key + AWS_SECRET_ACCESS_KEY: test_access_key_id + AWS_REGION: us-east-1 + AWS_S3_TEST_BUCKET: testbucket + PG_PARQUET_TEST: true + jobs: build-and-test: runs-on: ubuntu-latest @@ -33,6 +40,25 @@ jobs: sudo apt-get install build-essential libreadline-dev zlib1g-dev flex bison libxml2-dev libxslt-dev libssl-dev libxml2-utils xsltproc ccache pkg-config sudo apt-get -y install postgresql-16-postgis-3 libpq-dev postgresql-server-dev-16 postgresql-client-16 + - name: Install MinIO + run: | + # Download and install MinIO server and client + wget https://dl.min.io/server/minio/release/linux-amd64/$MINIO_VERSION + chmod +x $MINIO_VERSION + mv $MINIO_VERSION /usr/local/bin/minio + echo "$MINIO_SHA256 /usr/local/bin/minio" | sha256sum --check + + # Download and install MinIO admin + wget https://dl.min.io/client/mc/release/linux-amd64/$MINIO_ADMIN_VERSION + chmod +x $MINIO_ADMIN_VERSION + mv $MINIO_ADMIN_VERSION /usr/local/bin/mc + echo "$MINIO_ADMIN_SHA256 /usr/local/bin/mc" | sha256sum --check + env: + MINIO_VERSION: "minio.RELEASE.2024-09-22T00-33-43Z" + MINIO_SHA256: "dea08573980057d84c14d5c55926e10b91fb2993a99696ff136fb0bddaa7c98f" + MINIO_ADMIN_VERSION: "mc.RELEASE.2024-09-16T17-43-14Z" + MINIO_ADMIN_SHA256: "9a9e7d32c175f2804d6880d5ad3623097ea439f0e0304aa6039874d0f0c493d8" + - name: Install and configure pgrx run: | cargo install --locked cargo-pgrx@0.12.4 @@ -48,14 +74,28 @@ jobs: - name: Create .env file run: | touch /tmp/.env - echo AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} >> /tmp/.env - echo AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} >> /tmp/.env - echo AWS_REGION=${{ secrets.AWS_REGION }} >> /tmp/.env - echo AWS_S3_TEST_BUCKET=${{ secrets.AWS_S3_TEST_BUCKET }} >> /tmp/.env + echo AWS_ACCESS_KEY_ID=${{ env.AWS_ACCESS_KEY_ID }} >> /tmp/.env + echo AWS_SECRET_ACCESS_KEY=${{ env.AWS_SECRET_ACCESS_KEY }} >> /tmp/.env + echo AWS_REGION=${{ env.AWS_REGION }} >> /tmp/.env + echo AWS_S3_TEST_BUCKET=${{ env.AWS_S3_TEST_BUCKET }} >> /tmp/.env + echo PG_PARQUET_TEST=${{ env.PG_PARQUET_TEST }} >> /tmp/.env - name: Run tests run: | + # Start MinIO server + export MINIO_ROOT_USER=${{ env.AWS_ACCESS_KEY_ID }} + export MINIO_ROOT_PASSWORD=${{ env.AWS_SECRET_ACCESS_KEY }} + minio server /tmp/minio-storage > /dev/null 2>&1 & + + # Set access key and create test bucket + mc alias set local http://localhost:9000 ${{ env.AWS_ACCESS_KEY_ID }} ${{ env.AWS_SECRET_ACCESS_KEY }} + aws --endpoint-url http://localhost:9000 s3 mb s3://${{ env.AWS_S3_TEST_BUCKET }} + + # Run tests with coverage tool cargo llvm-cov test --lcov --output-path lcov.info + + # Stop MinIO server + pkill -9 minio env: RUST_TEST_THREADS: 1 CARGO_PGRX_TEST_RUNAS: postgres @@ -64,7 +104,7 @@ jobs: - name: Upload coverage report to Codecov uses: codecov/codecov-action@v4 with: - fail_ci_if_error: true + fail_ci_if_error: false files: ./lcov.info flags: pgrxtests token: ${{ secrets.CODECOV_TOKEN }} diff --git a/Cargo.toml b/Cargo.toml index cacf6f3..d7c10e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ pg_test = [] [dependencies] arrow = {version = "53", default-features = false} arrow-schema = {version = "53", default-features = false} +aws-config = { version = "1.5", default-features = false, features = ["rustls"]} +aws-credential-types = {version = "1.2", default-features = false} dotenvy = "0.15" futures = "0.3" object_store = {version = "0.11", default-features = false, features = ["aws"]} diff --git a/src/arrow_parquet/uri_utils.rs b/src/arrow_parquet/uri_utils.rs index c103b54..4c9315e 100644 --- a/src/arrow_parquet/uri_utils.rs +++ b/src/arrow_parquet/uri_utils.rs @@ -1,7 +1,14 @@ use std::{str::FromStr, sync::Arc}; use arrow::datatypes::SchemaRef; -use object_store::aws::AmazonS3Builder; +use aws_config::environment::{ + EnvironmentVariableCredentialsProvider, EnvironmentVariableRegionProvider, +}; +use aws_config::meta::credentials::CredentialsProviderChain; +use aws_config::meta::region::RegionProviderChain; +use aws_config::profile::{ProfileFileCredentialsProvider, ProfileFileRegionProvider}; +use aws_credential_types::provider::ProvideCredentials; +use object_store::aws::{AmazonS3, AmazonS3Builder}; use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; use parquet::arrow::arrow_to_parquet_schema; use parquet::arrow::async_writer::ParquetObjectWriter; @@ -55,19 +62,27 @@ async fn object_store_with_location(uri: &str) -> (Arc, Path) { match uri_format { UriFormat::File => { + // create or overwrite the local file + std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(uri) + .unwrap_or_else(|e| panic!("{}", e)); + let storage_container = Arc::new(LocalFileSystem::new()); + let location = Path::from_filesystem_path(uri).unwrap_or_else(|e| panic!("{}", e)); + (storage_container, location) } UriFormat::S3 => { - let (bucket, key) = parse_bucket_and_key(uri); - let storage_container = Arc::new( - AmazonS3Builder::from_env() - .with_bucket_name(bucket) - .build() - .unwrap_or_else(|e| panic!("{}", e)), - ); + let (bucket_name, key) = parse_bucket_and_key(uri); + + let storage_container = Arc::new(get_s3_object_store(&bucket_name).await); + let location = Path::from(key); + (storage_container, location) } } @@ -127,18 +142,6 @@ pub(crate) async fn parquet_writer_from_uri( arrow_schema: SchemaRef, writer_props: WriterProperties, ) -> AsyncArrowWriter { - let uri_format = UriFormat::from_str(uri).unwrap_or_else(|e| panic!("{}", e)); - - if uri_format == UriFormat::File { - // we overwrite the local file if it exists - std::fs::OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(uri) - .unwrap_or_else(|e| panic!("{}", e)); - } - let (parquet_object_store, location) = object_store_with_location(uri).await; let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location); @@ -146,3 +149,58 @@ pub(crate) async fn parquet_writer_from_uri( AsyncArrowWriter::try_new(parquet_object_writer, arrow_schema, Some(writer_props)) .unwrap_or_else(|e| panic!("failed to create parquet writer for uri {}: {}", uri, e)) } + +pub async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 { + // try loading the .env file + dotenvy::from_path("/tmp/.env").ok(); + + let mut aws_s3_builder = AmazonS3Builder::new().with_bucket_name(bucket_name); + + let is_test_running = std::env::var("PG_PARQUET_TEST").is_ok(); + + if is_test_running { + // use minio for testing + aws_s3_builder = aws_s3_builder.with_endpoint("http://localhost:9000"); + aws_s3_builder = aws_s3_builder.with_allow_http(true); + } + + let aws_profile_name = std::env::var("AWS_PROFILE").unwrap_or("default".to_string()); + + // load the region from the environment variables or the profile file + let region_provider = RegionProviderChain::first_try(EnvironmentVariableRegionProvider::new()) + .or_else( + ProfileFileRegionProvider::builder() + .profile_name(aws_profile_name.clone()) + .build(), + ); + + let region = region_provider.region().await; + + if let Some(region) = region { + aws_s3_builder = aws_s3_builder.with_region(region.to_string()); + } + + // load the credentials from the environment variables or the profile file + let credential_provider = CredentialsProviderChain::first_try( + "Environment", + EnvironmentVariableCredentialsProvider::new(), + ) + .or_else( + "Profile", + ProfileFileCredentialsProvider::builder() + .profile_name(aws_profile_name) + .build(), + ); + + if let Ok(credentials) = credential_provider.provide_credentials().await { + aws_s3_builder = aws_s3_builder.with_access_key_id(credentials.access_key_id()); + + aws_s3_builder = aws_s3_builder.with_secret_access_key(credentials.secret_access_key()); + + if let Some(token) = credentials.session_token() { + aws_s3_builder = aws_s3_builder.with_token(token); + } + } + + aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e)) +} diff --git a/src/lib.rs b/src/lib.rs index fb09710..c03fa87 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ pub extern "C" fn _PG_init() { #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { + use std::io::Write; use std::marker::PhantomData; use std::{collections::HashMap, fmt::Debug}; @@ -1116,7 +1117,7 @@ mod tests { } #[pg_test] - fn test_s3_object_store() { + fn test_s3_object_store_from_env() { dotenvy::from_path("/tmp/.env").unwrap(); let test_bucket_name: String = @@ -1130,6 +1131,48 @@ mod tests { test_helper(test_table); } + #[pg_test] + fn test_s3_object_store_from_config_file() { + dotenvy::from_path("/tmp/.env").unwrap(); + + let test_bucket_name: String = + std::env::var("AWS_S3_TEST_BUCKET").expect("AWS_S3_TEST_BUCKET not found"); + + // remove these to make sure the config file is used + let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap(); + std::env::remove_var("AWS_ACCESS_KEY_ID"); + let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap(); + std::env::remove_var("AWS_SECRET_ACCESS_KEY"); + let region = std::env::var("AWS_REGION").unwrap(); + std::env::remove_var("AWS_REGION"); + + // create a config file + std::env::set_var("AWS_PROFILE", "pg_parquet_test"); + let config_file_content = format!( + "[profile pg_parquet_test]\nregion = {}\naws_access_key_id = {}\naws_secret_access_key = {}\n", + region, access_key_id, secret_access_key + ); + + let config_file = format!("{}/.aws/config", std::env::var("HOME").unwrap()); + let config_file = std::path::Path::new(&config_file); + + let mut config_file = std::fs::OpenOptions::new() + .append(true) + .create(true) + .open(config_file) + .unwrap_or_else(|e| panic!("{}", e)); + config_file + .write_all(config_file_content.as_bytes()) + .unwrap(); + + let s3_uri = format!("s3://{}/pg_parquet_test.parquet", test_bucket_name); + + let test_table = TestTable::::new("int4".into()).with_uri(s3_uri); + + test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);"); + test_helper(test_table); + } + #[pg_test] #[should_panic(expected = "404 Not Found")] fn test_s3_object_store_write_invalid_uri() {