-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
UV package management and Snowflake logging extras #35
Conversation
- update project def - in workflows, install uv and install packages through it (cache slower than reinstall) - init uv.lock with existing deps
- logs start/end of snowflake command - optionally display or mask parameters in command - display command on failure - extended connector block for wrapping the execute method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Got couple questions and propostions before approving.
src/prefecto/ext/snowflake.py
Outdated
except Exception as e: | ||
if __python_version__.major > 3 or ( | ||
__python_version__.major == 3 and __python_version__.minor >= 11 | ||
): | ||
e.add_note(f"Command failed\n{formatted_command_string}") | ||
logger.error("Command failed.", exc_info=e, stack_info=True) | ||
raise e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can change it to this to avoid checking the versions:
if hasattr(e, "add_note") and callable(e.add_note):
e.add_note("Command failed."...)
src/prefecto/ext/snowflake.py
Outdated
[Snowflake Python connector documentation](https://docs.snowflake.com/en/user-guide/python-connector-api.html#connect). | ||
|
||
Args: | ||
**connect_kwargs: Additional arguments to pass to | ||
`snowflake.connector.connect`. | ||
|
||
Returns: | ||
An authenticated Snowflake connection. | ||
|
||
Example: | ||
Get Snowflake connection with only block configuration: | ||
```python | ||
from prefecto.ext.snowflake import PrefectoSnowflakeCredentials as SnowflakeCredentials | ||
|
||
snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME") | ||
|
||
connection = snowflake_credentials_block.get_client() | ||
``` | ||
|
||
Get Snowflake connector scoped to a specified database: | ||
```python | ||
from prefecto.ext.snowflake import PrefectoSnowflakeCredentials as SnowflakeCredentials | ||
|
||
snowflake_credentials_block = SnowflakeCredentials.load("BLOCK_NAME") | ||
|
||
connection = snowflake_credentials_block.get_client(database="my_database") | ||
``` | ||
""" # noqa | ||
connect_params = { | ||
# required to track task's usage in the Snowflake Partner Network Portal | ||
"application": "Prefect_Snowflake_Collection", | ||
**self.credentials.model_dump( | ||
exclude_unset=True, exclude={"block_type_slug"} | ||
), | ||
**connect_kwargs, | ||
} | ||
|
||
for key, value in connect_params.items(): | ||
if isinstance(value, (SecretStr, SecretBytes)): | ||
connect_params[key] = connect_params[key].get_secret_value() | ||
|
||
# set authenticator to the actual okta_endpoint | ||
if connect_params.get("authenticator") == "okta_endpoint": | ||
endpoint = connect_params.pop("endpoint", None) or connect_params.pop( | ||
"okta_endpoint", None | ||
) # okta_endpoint is deprecated | ||
connect_params["authenticator"] = endpoint | ||
|
||
private_der_key = self.credentials.resolve_private_key() | ||
if private_der_key is not None: | ||
connect_params["private_key"] = private_der_key | ||
connect_params.pop("password", None) | ||
connect_params.pop("private_key_passphrase", None) | ||
|
||
return PrefectoSnowflakeConnection(**connect_params) | ||
|
||
def get_connection(self, **connect_kwargs: Any) -> PrefectoSnowflakeConnection: | ||
""" | ||
Returns an authenticated connection that can be | ||
used to command from Snowflake databases. | ||
|
||
Args: | ||
**connect_kwargs: Additional arguments to pass to | ||
`snowflake.connector.connect`. | ||
|
||
Returns: | ||
The authenticated SnowflakeConnection. | ||
|
||
Examples: | ||
```python | ||
from prefecto.ext.snowflake import PrefectoSnowflakeCredentials as SnowflakeCredentials | ||
from prefecto.ext.snowflake import PrefectoSnowflakeConnector as SnowflakeConnector | ||
|
||
snowflake_credentials = SnowflakeCredentials( | ||
account="account", | ||
user="user", | ||
password="password", | ||
) | ||
snowflake_connector = SnowflakeConnector( | ||
database="database", | ||
warehouse="warehouse", | ||
schema="schema", | ||
credentials=snowflake_credentials | ||
) | ||
with snowflake_connector.get_connection() as connection: | ||
... | ||
``` | ||
""" | ||
if self._connection is not None: | ||
return self._connection | ||
|
||
connect_params = { | ||
"database": self.database, | ||
"warehouse": self.warehouse, | ||
"schema": self.schema_, | ||
} | ||
connection = self._credentials_get_client(**connect_kwargs, **connect_params) | ||
self._connection = connection | ||
self.logger.info("Started a new connection to Snowflake.") | ||
return connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm wrong, but this one follows the same logic found in prefect_snowflake
, just injecting the custom PrefectoSnowflakeConnection
at the end of _credentials_get_client
, whereas in prefect_snowflake
it calls snowflake.connector.connect(**connect_params)
instead. Are there other mechanisms that prefect_snowflake
provides to accomplish this without rewriting the same logic? I'd rather use PrefectoSnowflakeCursor
directly with the call to cursor(PrefectoSnowflakeCursor)
on the original connection object returned from SnowflakeConnector
. Otherwise, one possible approach that I can think of to remove the duplicate logic is to use composition over inheritance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that idea more. I removed the relationship to prefect-snowflake and focused on keeping this a cursor class like DictCursor
. It raises the question of "why is this in prefecto?", but it will be used alongside Prefect enough to be worth it as an extension.
from snowflake.connector import connect
from prefecto.ext.snowflake import LogCursor
with connect(...).cursor(LogCursor) as cursor:
cursor.execute(
"SELECT * FROM table WHERE id = %(id)s AND secret = %(secret)s",
params={"id": 123, "secret": "shhh"},
obfuscate_params=["secret"],
command_id="secret-selection",
)
INFO - [secret-selection] Beginning command.
DEBUG - [secret-selection] Executing command:
SELECT * FROM table WHERE id = 123 AND secret = ****
INFO - [secret-selection] Command executed successfully.
This approach emphasizes just using the Snowflake connection object's `.cursor(cursor_cls)` functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Two-part PR
Snowflake Logging
Gives
prefect_snowflake.SnowflakeConnector
users the ability to log commands executed by the cursor if it fails or during debug. This is particularly useful for pipelines generating SQL on the fly.