-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
validate dataframe before sending it to kafka for online storage #391
Comments
I tried this and I get this error when you try to insert a different set of features only to the online feature store. Isn't that explanatory enough? economy_fg.insert(economy_bulk_insert_df, storage="online")
An error was encountered:
cannot resolve '`year`' given input columns: [car, commission, hvalue, hyears, id, loan, salary];
'Project [id#6528, salary#6529, commission#6530, car#6531, hvalue#6532, hyears#6533, loan#6534, 'year]
+- Project [id#6514 AS id#6528, salary#6515 AS salary#6529, commission#6516 AS commission#6530, car#6517 AS car#6531, hvalue#6518 AS hvalue#6532, hyears#6519 AS hyears#6533, loan#6520 AS loan#6534]
+- LogicalRDD [id#6514, salary#6515, commission#6516, car#6517, hvalue#6518, hyears#6519, loan#6520], false
Traceback (most recent call last):
File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/feature_group.py", line 791, in insert
write_options,
File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/core/feature_group_engine.py", line 104, in insert
validation_id,
File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py", line 176, in save_dataframe
self._save_online_dataframe(feature_group, dataframe, online_write_options)
File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py", line 264, in _save_online_dataframe
feature_group, self._encode_complex_features(feature_group, dataframe)
File "/srv/hops/anaconda/envs/theenv/lib/python3.7/site-packages/hsfs/engine/spark.py", line 279, in _encode_complex_features
for field in json.loads(feature_group.avro_schema)["fields"]
File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1669, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/srv/hops/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/srv/hops/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: cannot resolve '`year`' given input columns: [car, commission, hvalue, hyears, id, loan, salary];
'Project [id#6528, salary#6529, commission#6530, car#6531, hvalue#6532, hyears#6533, loan#6534, 'year]
+- Project [id#6514 AS id#6528, salary#6515 AS salary#6529, commission#6516 AS commission#6530, car#6517 AS car#6531, hvalue#6518 AS hvalue#6532, hyears#6519 AS hyears#6533, loan#6520 AS loan#6534]
+- LogicalRDD [id#6514, salary#6515, commission#6516, car#6517, hvalue#6518, hyears#6519, loan#6520], false |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
We currently don't validate the dataframe before sending it to kafka to be stored on the online feature store.
If the user only writes to the online feature store with a different dataframe, the data won't be inserted but the user won't have any feedback on why.
The text was updated successfully, but these errors were encountered: