diff --git a/README.md b/README.md index f7cf94e..4f7b594 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,12 @@ A lot of pre-filtering involves trimming down your dataset based on the values a key, partition) +## To Test +``` +bash dev_env --build +pytest tests/ +``` + ## Redshift Spectrum Dataframes published to S3 can optionally be queried in AWS Redshift Spectrum. To enable this functionality, you must have an external database configured in Redshift. See the [AWS docs](https://docs.aws.amazon.com/redshift/latest/dg/c-using-spectrum.html) for help setting up a database in Redshift. To enable this functionality in S3parq, simply pass a dictionary of configurations to `publish()` via the redshift_params argument. diff --git a/requirements.txt b/requirements.txt index 996f3eb..f9bc1e7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -pandas==0.24.2 -pyarrow==0.13.0 -boto3==1.9.177 -s3fs==0.2.1 +pandas==1.2.4 +pyarrow==4.0.0 +boto3==1.17.58 +s3fs==0.4.2 dfmock==0.0.14 moto==1.3.8 psycopg2==2.8.3 diff --git a/s3parq/publish_parq.py b/s3parq/publish_parq.py index fc52f94..2fb5271 100644 --- a/s3parq/publish_parq.py +++ b/s3parq/publish_parq.py @@ -110,6 +110,7 @@ def validate_redshift_params(redshift_params: dict) -> dict: - port (str): Redshift Spectrum port to use - db_name (str): Redshift Spectrum database name to use - ec2_user (str): If on ec2, the user that should be used + - read_access_user (str): Name of user getting READ access on a schema Returns: The given redshift_params, with table and schema names lowercase @@ -120,7 +121,7 @@ def validate_redshift_params(redshift_params: dict) -> dict: ValueError: If redshift_params is missing any of the above attributes """ expected_params = ["schema_name", "table_name", "iam_role", - "region", "cluster_id", "host", "port", "db_name", "ec2_user"] + "region", "cluster_id", "host", "port", "db_name", "ec2_user", "read_access_user"] logger.debug("Checking redshift params are correctly formatted") if len(redshift_params) != len(expected_params): params_length_message = f"Expected parameters: {len(expected_params)}. Received: {len(redshift_params)}" @@ -513,7 +514,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram session_helper.configure_session_helper() publish_redshift.create_schema( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, redshift_params['read_access_user']) logger.debug( f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...") @@ -618,7 +619,7 @@ def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.D session_helper.configure_session_helper() publish_redshift.create_schema( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, redshift_params['read_access_user']) logger.debug( f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...") diff --git a/s3parq/publish_redshift.py b/s3parq/publish_redshift.py index 679feb4..aa8f73f 100644 --- a/s3parq/publish_redshift.py +++ b/s3parq/publish_redshift.py @@ -178,7 +178,7 @@ def _datatype_mapper(columns: dict) -> dict: return f"({sql_statement[:-2]})" # Slice off the last space and comma -def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper) -> None: +def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper, read_access_user=None) -> None: """ Creates a schema in AWS redshift using a given iam_role Args: @@ -198,6 +198,13 @@ def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: logger.info(f'Running query to create schema: {new_schema_query}') scope.execute(new_schema_query) + if read_access_user: + grant_access_query = f"GRANT USAGE ON SCHEMA {schema_name} TO {read_access_user};\ + GRANT SELECT ON ALL TABLES IN SCHEMA {schema_name} TO {read_access_user};\ + ALTER DEFAULT PRIVILEGES IN SCHEMA {schema_name} GRANT SELECT ON TABLES TO {read_access_user};" + logger.info(f'Running query to grant access to schema: {grant_access_query}') + scope.execute(grant_access_query) + def create_table(table_name: str, schema_name: str, columns: dict, partitions: dict, path: str, session_helper: SessionHelper) -> None: """ Creates a table in AWS redshift. The table will be named diff --git a/tests/test_publish_parq.py b/tests/test_publish_parq.py index 1190cdd..693fbcc 100644 --- a/tests/test_publish_parq.py +++ b/tests/test_publish_parq.py @@ -47,7 +47,8 @@ def setup_redshift_params(self): 'host': 'hamburger_host', 'port': '9999', 'db_name': 'hamburger_db', - 'ec2_user': 'hamburger_aws' + 'ec2_user': 'hamburger_aws', + 'read_access_user': 'some_read_only_user' } return redshift_params @@ -285,7 +286,7 @@ def test_schema_publish(self, mock_session_helper, mock_create_schema): dataframe=dataframe, partitions=partitions, redshift_params=redshift_params) mock_create_schema.assert_called_once_with( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, redshift_params['read_access_user']) @patch('s3parq.publish_redshift.create_table') @patch('s3parq.publish_parq.SessionHelper') @@ -375,7 +376,7 @@ def test_custom_publish_schema_publish(self, mock_session_helper, mock_create_sc custom_redshift_columns=custom_redshift_columns) mock_create_schema.assert_called_once_with( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, redshift_params['read_access_user']) @patch('s3parq.publish_redshift.create_custom_table') @patch('s3parq.publish_parq.SessionHelper')