-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Update dataframe batch.validate workflow #10165
Conversation
✅ Deploy Preview for niobium-lead-7998 canceled.
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #10165 +/- ##
===========================================
- Coverage 79.21% 79.20% -0.02%
===========================================
Files 456 456
Lines 39720 39703 -17
===========================================
- Hits 31465 31445 -20
- Misses 8255 8258 +3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@@ -129,13 +129,7 @@ def _init_fluent_datasource(self, name: str, ds: FluentDatasource) -> FluentData | |||
datasource_name=name, | |||
data_asset_name=asset.name, | |||
) | |||
cached_data_asset = self._in_memory_data_assets.get(in_memory_asset_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code block is removed because we no longer store the dataframe on the asset.
@@ -477,7 +479,11 @@ def _get_batch_metadata_from_batch_request(self, batch_request: BatchRequest) -> | |||
batch_metadata = _ConfigurationSubstitutor().substitute_all_config_variables( | |||
data=batch_metadata, replace_variables_dict=config_variables | |||
) | |||
batch_metadata.update(copy.deepcopy(batch_request.options)) | |||
batch_metadata.update( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was added since the dataframe now is always passed in in options and we don't want to copy that to the metadata (spark will actual die if we try). Previously, for runtime dataframes we forced batch_request.options
to be {}
and added a special argument for the dataframe which broke Liskov and arguably the Interface Segragation principles of SOLID.
@@ -994,7 +1000,7 @@ def __init__( # noqa: PLR0913 | |||
def _create_id(self) -> str: | |||
options_list = [] | |||
for key, value in self.batch_request.options.items(): | |||
if key != "path": | |||
if key not in ("path", "dataframe"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want the dataframe
to be part of the Batch id. We should probably have this configurable at the asset/batch definition level since it seems strange for the top level batch to know specifics about different assets/batch definitions that produce them, but I am following the pattern here and we can refactor later.
@@ -356,21 +355,13 @@ def _short_id() -> str: | |||
return str(uuid.uuid4()).replace("-", "")[:11] | |||
|
|||
|
|||
class DataFrameAsset(_PandasDataAsset, Generic[_PandasDataFrameT]): | |||
class DataFrameAsset(_PandasDataAsset): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We remove the dataframe from the asset since it now needs to be passed in when making the batch definition. This let's us remove all the dataframe logic and this Generic from the asset.
context = gx.get_context(context_root_dir=context.root_directory, cloud_mode=False) | ||
dataframe_asset = context.get_datasource(datasource_name="fluent_pandas_datasource").get_asset( | ||
asset_name="my_df_asset" | ||
) | ||
_ = dataframe_asset.build_batch_request(dataframe=df) | ||
assert dataframe_asset.dataframe.equals(df) # type: ignore[attr-defined] # _PandasDataFrameT | ||
reloaded_batch_def = dataframe_asset.get_batch_definition(batch_definition_name="bd") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change the argument to name
instead of batch_definition_name
. I can do that in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually see both patterns in the code depending on domain object. I'd prefer these to all be name since we know the type of the object from the method name, eg get_<domain_object_type>
.
assert new_batch.data.dataframe.toPandas().equals(df) | ||
|
||
|
||
@dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From here to the end of the file are new e2e tests that verify the validation workflows for batches and validation_defintions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me!
@@ -44,7 +44,7 @@ | |||
|
|||
# Python | |||
# <snippet name="docs/docusaurus/docs/snippets/get_existing_data_asset_from_existing_datasource_pandas_filesystem_example.py build_batch_request_with_dataframe"> | |||
my_batch_request = my_asset.build_batch_request(dataframe=dataframe) | |||
my_batch_request = my_asset.build_batch_request(options={"dataframe": dataframe}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First impression here: does this interface put us in a place where things are less predictable? Should there just be a different method for building a batch request via dataframe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I like the unified method call perhaps we can add a specialized method based on need?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to have a better runtime story. This is a stopgap for the short term. I think currently options
and other parameters exist and this throws if you pass them. This adds consistency. It also prevents you from adding dataframes onto the asset which had some weird consequences.
But overall, I agree that this isn't ideal.
dataframe: SparkDataFrame | ||
|
||
|
||
def _validate_whole_dataframe_batch_definition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it feels weird to have this method defined in between its uses. Obviously non-blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly more of a nit, but also not blocking: this pattern of putting the asserts and some other other non-setup stuff in a helper function makes the test somewhat harder to reason about, and likely invites overloading even more logic into here. I guess some things I'd prefer in tests:
assert
s live in the test itself, not a helper method- I don't want to have to read the helper implementation to know what expectation my data is tested on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this tests should really be parameterized and the helper functions should be inline. I can follow up with that fix.
The shape of the API are implementing is described is:
invoke lint
(usesruff format
+ruff check
)For more information about contributing, see Contribute.
After you submit your PR, keep the page open and monitor the statuses of the various checks made by our continuous integration process at the bottom of the page. Please fix any issues that come up and reach out on Slack if you need help. Thanks for contributing!