From 5b1bddb6b881ee4250c8356592ed2790629ec33e Mon Sep 17 00:00:00 2001 From: "Jacob S. Tobias" Date: Sat, 17 Apr 2021 10:21:20 -0400 Subject: [PATCH 1/5] add sql to grant permissions on schema to user --- s3parq/publish_redshift.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/s3parq/publish_redshift.py b/s3parq/publish_redshift.py index 679feb4..aa8f73f 100644 --- a/s3parq/publish_redshift.py +++ b/s3parq/publish_redshift.py @@ -178,7 +178,7 @@ def _datatype_mapper(columns: dict) -> dict: return f"({sql_statement[:-2]})" # Slice off the last space and comma -def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper) -> None: +def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper, read_access_user=None) -> None: """ Creates a schema in AWS redshift using a given iam_role Args: @@ -198,6 +198,13 @@ def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: logger.info(f'Running query to create schema: {new_schema_query}') scope.execute(new_schema_query) + if read_access_user: + grant_access_query = f"GRANT USAGE ON SCHEMA {schema_name} TO {read_access_user};\ + GRANT SELECT ON ALL TABLES IN SCHEMA {schema_name} TO {read_access_user};\ + ALTER DEFAULT PRIVILEGES IN SCHEMA {schema_name} GRANT SELECT ON TABLES TO {read_access_user};" + logger.info(f'Running query to grant access to schema: {grant_access_query}') + scope.execute(grant_access_query) + def create_table(table_name: str, schema_name: str, columns: dict, partitions: dict, path: str, session_helper: SessionHelper) -> None: """ Creates a table in AWS redshift. The table will be named From e9420808c69be03005133b93225aa7936f640d04 Mon Sep 17 00:00:00 2001 From: "Jacob S. Tobias" Date: Sat, 17 Apr 2021 10:24:32 -0400 Subject: [PATCH 2/5] add documentation on how to run tests --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index f7cf94e..4f7b594 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,12 @@ A lot of pre-filtering involves trimming down your dataset based on the values a key, partition) +## To Test +``` +bash dev_env --build +pytest tests/ +``` + ## Redshift Spectrum Dataframes published to S3 can optionally be queried in AWS Redshift Spectrum. To enable this functionality, you must have an external database configured in Redshift. See the [AWS docs](https://docs.aws.amazon.com/redshift/latest/dg/c-using-spectrum.html) for help setting up a database in Redshift. To enable this functionality in S3parq, simply pass a dictionary of configurations to `publish()` via the redshift_params argument. From 95b08217f89181dda760aa7b89ead662ecdeb902 Mon Sep 17 00:00:00 2001 From: "Jacob S. Tobias" Date: Mon, 26 Apr 2021 20:08:48 -0400 Subject: [PATCH 3/5] update packages to get the tests working again --- requirements.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index 996f3eb..f9bc1e7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -pandas==0.24.2 -pyarrow==0.13.0 -boto3==1.9.177 -s3fs==0.2.1 +pandas==1.2.4 +pyarrow==4.0.0 +boto3==1.17.58 +s3fs==0.4.2 dfmock==0.0.14 moto==1.3.8 psycopg2==2.8.3 From de4990c2f62c781fa96e45182fdea0a4837bb0fb Mon Sep 17 00:00:00 2001 From: "Jacob S. Tobias" Date: Mon, 26 Apr 2021 20:22:17 -0400 Subject: [PATCH 4/5] add readonly_user arg to publish methods --- s3parq/publish_parq.py | 8 ++++---- tests/test_publish_parq.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/s3parq/publish_parq.py b/s3parq/publish_parq.py index fc52f94..d269b39 100644 --- a/s3parq/publish_parq.py +++ b/s3parq/publish_parq.py @@ -459,7 +459,7 @@ def log_size_estimate(num_bytes): yield {'lower': lower, 'upper': upper} -def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, redshift_params: dict = None) -> List[str]: +def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, redshift_params: dict = None, read_access_user: str =None) -> List[str]: """ Dataframe to S3 Parquet Publisher This function handles the portion of work that will see a dataframe converted to parquet and then published to the given S3 location. @@ -513,7 +513,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram session_helper.configure_session_helper() publish_redshift.create_schema( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, read_access_user) logger.debug( f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...") @@ -553,7 +553,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram return files -def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, custom_redshift_columns: dict, redshift_params: dict = None) -> List[str]: +def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, custom_redshift_columns: dict, redshift_params: dict = None, read_access_user: str =None) -> List[str]: """ Dataframe to S3 Parquet Publisher with a CUSTOM redshift column definition. Custom publish allows custom defined redshift column definitions to be used and enables support for Redshift's decimal data type. @@ -618,7 +618,7 @@ def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.D session_helper.configure_session_helper() publish_redshift.create_schema( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, read_access_user) logger.debug( f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...") diff --git a/tests/test_publish_parq.py b/tests/test_publish_parq.py index 1190cdd..752af15 100644 --- a/tests/test_publish_parq.py +++ b/tests/test_publish_parq.py @@ -285,7 +285,7 @@ def test_schema_publish(self, mock_session_helper, mock_create_schema): dataframe=dataframe, partitions=partitions, redshift_params=redshift_params) mock_create_schema.assert_called_once_with( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, None) @patch('s3parq.publish_redshift.create_table') @patch('s3parq.publish_parq.SessionHelper') @@ -375,7 +375,7 @@ def test_custom_publish_schema_publish(self, mock_session_helper, mock_create_sc custom_redshift_columns=custom_redshift_columns) mock_create_schema.assert_called_once_with( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, None) @patch('s3parq.publish_redshift.create_custom_table') @patch('s3parq.publish_parq.SessionHelper') From 06984ea23a5aa71fe272d2d98a59493c17c06d2a Mon Sep 17 00:00:00 2001 From: "Jacob S. Tobias" Date: Mon, 10 May 2021 07:32:09 -0400 Subject: [PATCH 5/5] move read_access_user to redshift_params dict and update tests to accomodate new param --- s3parq/publish_parq.py | 11 ++++++----- tests/test_publish_parq.py | 7 ++++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/s3parq/publish_parq.py b/s3parq/publish_parq.py index d269b39..2fb5271 100644 --- a/s3parq/publish_parq.py +++ b/s3parq/publish_parq.py @@ -110,6 +110,7 @@ def validate_redshift_params(redshift_params: dict) -> dict: - port (str): Redshift Spectrum port to use - db_name (str): Redshift Spectrum database name to use - ec2_user (str): If on ec2, the user that should be used + - read_access_user (str): Name of user getting READ access on a schema Returns: The given redshift_params, with table and schema names lowercase @@ -120,7 +121,7 @@ def validate_redshift_params(redshift_params: dict) -> dict: ValueError: If redshift_params is missing any of the above attributes """ expected_params = ["schema_name", "table_name", "iam_role", - "region", "cluster_id", "host", "port", "db_name", "ec2_user"] + "region", "cluster_id", "host", "port", "db_name", "ec2_user", "read_access_user"] logger.debug("Checking redshift params are correctly formatted") if len(redshift_params) != len(expected_params): params_length_message = f"Expected parameters: {len(expected_params)}. Received: {len(redshift_params)}" @@ -459,7 +460,7 @@ def log_size_estimate(num_bytes): yield {'lower': lower, 'upper': upper} -def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, redshift_params: dict = None, read_access_user: str =None) -> List[str]: +def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, redshift_params: dict = None) -> List[str]: """ Dataframe to S3 Parquet Publisher This function handles the portion of work that will see a dataframe converted to parquet and then published to the given S3 location. @@ -513,7 +514,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram session_helper.configure_session_helper() publish_redshift.create_schema( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, read_access_user) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, redshift_params['read_access_user']) logger.debug( f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...") @@ -553,7 +554,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram return files -def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, custom_redshift_columns: dict, redshift_params: dict = None, read_access_user: str =None) -> List[str]: +def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, custom_redshift_columns: dict, redshift_params: dict = None) -> List[str]: """ Dataframe to S3 Parquet Publisher with a CUSTOM redshift column definition. Custom publish allows custom defined redshift column definitions to be used and enables support for Redshift's decimal data type. @@ -618,7 +619,7 @@ def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.D session_helper.configure_session_helper() publish_redshift.create_schema( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, read_access_user) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, redshift_params['read_access_user']) logger.debug( f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...") diff --git a/tests/test_publish_parq.py b/tests/test_publish_parq.py index 752af15..693fbcc 100644 --- a/tests/test_publish_parq.py +++ b/tests/test_publish_parq.py @@ -47,7 +47,8 @@ def setup_redshift_params(self): 'host': 'hamburger_host', 'port': '9999', 'db_name': 'hamburger_db', - 'ec2_user': 'hamburger_aws' + 'ec2_user': 'hamburger_aws', + 'read_access_user': 'some_read_only_user' } return redshift_params @@ -285,7 +286,7 @@ def test_schema_publish(self, mock_session_helper, mock_create_schema): dataframe=dataframe, partitions=partitions, redshift_params=redshift_params) mock_create_schema.assert_called_once_with( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, None) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, redshift_params['read_access_user']) @patch('s3parq.publish_redshift.create_table') @patch('s3parq.publish_parq.SessionHelper') @@ -375,7 +376,7 @@ def test_custom_publish_schema_publish(self, mock_session_helper, mock_create_sc custom_redshift_columns=custom_redshift_columns) mock_create_schema.assert_called_once_with( - redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, None) + redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, redshift_params['read_access_user']) @patch('s3parq.publish_redshift.create_custom_table') @patch('s3parq.publish_parq.SessionHelper')