From f304f9264ca83e7c53e2dae722c8fecf9186af16 Mon Sep 17 00:00:00 2001 From: Moritz Meister <8422705+moritzmeister@users.noreply.github.com> Date: Mon, 30 Jan 2023 22:09:06 +0100 Subject: [PATCH] [FSTORE-639] Pin SQLalchemy version or upgrade to new 2.0.0 version (#922) --- python/hsfs/core/vector_server.py | 6 +++--- python/hsfs/engine/python.py | 3 ++- python/tests/engine/test_python.py | 6 ++++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 892475f4fd..2a884b2eff 100644 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -176,7 +176,7 @@ def get_feature_vector(self, entry, passed_features={}): for row in result_proxy: result_dict = self.deserialize_complex_features( - self._complex_features, dict(row.items()) + self._complex_features, row._asdict() ) if not result_dict: raise Exception( @@ -221,13 +221,13 @@ def get_feature_vectors(self, entry, passed_features=[]): result_proxy = mysql_conn.execute( prepared_statement, - batch_ids=entry_values_tuples, + {"batch_ids": entry_values_tuples}, ).fetchall() statement_results = [] for row in result_proxy: result_dict = self.deserialize_complex_features( - self._complex_features, dict(row.items()) + self._complex_features, row._asdict() ) if not result_dict: diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 73bc59071e..b276d10311 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -39,6 +39,7 @@ from confluent_kafka import Producer, KafkaError from tqdm.auto import tqdm from botocore.response import StreamingBody +from sqlalchemy import sql from hsfs import client, feature, util from hsfs.client.exceptions import FeatureStoreException @@ -114,7 +115,7 @@ def _jdbc(self, sql_query, connector, dataframe_type, read_options, schema=None) ), ) with self._mysql_online_fs_engine.connect() as mysql_conn: - result_df = pd.read_sql(sql_query, mysql_conn) + result_df = pd.read_sql(sql.text(sql_query), mysql_conn) if schema: result_df = Engine.cast_columns(result_df, schema, online=True) return self._return_dataframe_type(result_df, dataframe_type) diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index 7546ad1773..39b6414fc0 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -110,12 +110,13 @@ def test_jdbc(self, mocker): mock_python_engine_return_dataframe_type = mocker.patch( "hsfs.engine.python.Engine._return_dataframe_type" ) + query = "SELECT * FROM TABLE" python_engine = python.Engine() # Act python_engine._jdbc( - sql_query=None, connector=None, dataframe_type=None, read_options={} + sql_query=query, connector=None, dataframe_type=None, read_options={} ) # Assert @@ -130,12 +131,13 @@ def test_jdbc_read_options(self, mocker): mock_python_engine_return_dataframe_type = mocker.patch( "hsfs.engine.python.Engine._return_dataframe_type" ) + query = "SELECT * FROM TABLE" python_engine = python.Engine() # Act python_engine._jdbc( - sql_query=None, + sql_query=query, connector=None, dataframe_type=None, read_options={"external": ""},