Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grant user access to schema #81

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ A lot of pre-filtering involves trimming down your dataset based on the values a
key,
partition)

## To Test
```
bash dev_env --build
pytest tests/
```

## Redshift Spectrum
Dataframes published to S3 can optionally be queried in AWS Redshift Spectrum. To enable this functionality, you must have an external database configured in Redshift. See the [AWS docs](https://docs.aws.amazon.com/redshift/latest/dg/c-using-spectrum.html) for help setting up a database in Redshift. To enable this functionality in S3parq, simply pass a dictionary of configurations to `publish()` via the redshift_params argument.

Expand Down
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pandas==0.24.2
pyarrow==0.13.0
boto3==1.9.177
s3fs==0.2.1
pandas==1.2.4
pyarrow==4.0.0
boto3==1.17.58
s3fs==0.4.2
dfmock==0.0.14
moto==1.3.8
psycopg2==2.8.3
Expand Down
8 changes: 4 additions & 4 deletions s3parq/publish_parq.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def log_size_estimate(num_bytes):
yield {'lower': lower, 'upper': upper}


def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, redshift_params: dict = None) -> List[str]:
def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, redshift_params: dict = None, read_access_user: str =None) -> List[str]:
jacobtobias marked this conversation as resolved.
Show resolved Hide resolved
""" Dataframe to S3 Parquet Publisher
This function handles the portion of work that will see a dataframe converted
to parquet and then published to the given S3 location.
Expand Down Expand Up @@ -513,7 +513,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram

session_helper.configure_session_helper()
publish_redshift.create_schema(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, read_access_user)
logger.debug(
f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...")

Expand Down Expand Up @@ -553,7 +553,7 @@ def publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFram

return files

def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, custom_redshift_columns: dict, redshift_params: dict = None) -> List[str]:
def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.DataFrame, custom_redshift_columns: dict, redshift_params: dict = None, read_access_user: str =None) -> List[str]:
""" Dataframe to S3 Parquet Publisher with a CUSTOM redshift column definition.
Custom publish allows custom defined redshift column definitions to be used and
enables support for Redshift's decimal data type.
Expand Down Expand Up @@ -618,7 +618,7 @@ def custom_publish(bucket: str, key: str, partitions: List[str], dataframe: pd.D

session_helper.configure_session_helper()
publish_redshift.create_schema(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], session_helper, read_access_user)
logger.debug(
f"Schema {redshift_params['schema_name']} created. Creating table {redshift_params['table_name']}...")

Expand Down
9 changes: 8 additions & 1 deletion s3parq/publish_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def _datatype_mapper(columns: dict) -> dict:
return f"({sql_statement[:-2]})" # Slice off the last space and comma


def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper) -> None:
def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper: SessionHelper, read_access_user=None) -> None:
jacobtobias marked this conversation as resolved.
Show resolved Hide resolved
""" Creates a schema in AWS redshift using a given iam_role

Args:
Expand All @@ -198,6 +198,13 @@ def create_schema(schema_name: str, db_name: str, iam_role: str, session_helper:
logger.info(f'Running query to create schema: {new_schema_query}')
scope.execute(new_schema_query)

if read_access_user:
grant_access_query = f"GRANT USAGE ON SCHEMA {schema_name} TO {read_access_user};\
GRANT SELECT ON ALL TABLES IN SCHEMA {schema_name} TO {read_access_user};\
ALTER DEFAULT PRIVILEGES IN SCHEMA {schema_name} GRANT SELECT ON TABLES TO {read_access_user};"
logger.info(f'Running query to grant access to schema: {grant_access_query}')
scope.execute(grant_access_query)


def create_table(table_name: str, schema_name: str, columns: dict, partitions: dict, path: str, session_helper: SessionHelper) -> None:
""" Creates a table in AWS redshift. The table will be named
Expand Down
4 changes: 2 additions & 2 deletions tests/test_publish_parq.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def test_schema_publish(self, mock_session_helper, mock_create_schema):
dataframe=dataframe, partitions=partitions, redshift_params=redshift_params)

mock_create_schema.assert_called_once_with(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, None)

@patch('s3parq.publish_redshift.create_table')
@patch('s3parq.publish_parq.SessionHelper')
Expand Down Expand Up @@ -375,7 +375,7 @@ def test_custom_publish_schema_publish(self, mock_session_helper, mock_create_sc
custom_redshift_columns=custom_redshift_columns)

mock_create_schema.assert_called_once_with(
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh)
redshift_params['schema_name'], redshift_params['db_name'], redshift_params['iam_role'], msh, None)

@patch('s3parq.publish_redshift.create_custom_table')
@patch('s3parq.publish_parq.SessionHelper')
Expand Down