diff --git a/superset/config.py b/superset/config.py index 8a490a982d46d..a4dbe5a877e40 100644 --- a/superset/config.py +++ b/superset/config.py @@ -563,6 +563,7 @@ class D3TimeFormat(TypedDict, total=False): # If on, you'll want to add "https://avatars.slack-edge.com" to the list of allowed # domains in your TALISMAN_CONFIG "SLACK_ENABLE_AVATARS": False, + "DISABLE_METADATA_DB_DURING_ANALYTICS": True, } # ------------------------------ diff --git a/superset/models/core.py b/superset/models/core.py index 9b06932afc890..94e915f0d7c67 100755 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -95,6 +95,35 @@ DB_CONNECTION_MUTATOR = config["DB_CONNECTION_MUTATOR"] +@contextmanager +def temporarily_disconnect_metadata_db(): # type: ignore + """ + Temporary disconnects the metadata database session during analytics query. + The goal here is to lower the number of concurrent connections to the metadata database, + given that Superset has no control over the duration of the analytics query. + + If using a connection pool, the connn is release during the period. If not, the conn is closed + and has to be re-established. + + Only has an effect if feature flag DISABLE_METADATA_DB_DURING_ANALYTICS is on + """ + do_it = is_feature_enabled("DISABLE_METADATA_DB_DURING_ANALYTICS") + print("YO: temporarily_disconnect_metadata_db") + try: + if do_it: + print("YO: Disconnecting") + db.session.close() + yield None + finally: + if do_it: + print("YO: Reconnecting to metadata database") + conn = db.engine.connect() + print(f"YO: conn: {conn.closed}") + print(conn) + conn.open() + db.session = db._make_scoped_session() + + class KeyValue(Model): # pylint: disable=too-few-public-methods """Used for any type of key-value store""" @@ -691,27 +720,28 @@ def _log_query(sql: str) -> None: with self.get_raw_connection(catalog=catalog, schema=schema) as conn: cursor = conn.cursor() df = None - for i, sql_ in enumerate(sqls): - sql_ = self.mutate_sql_based_on_config(sql_, is_split=True) - _log_query(sql_) - with event_logger.log_context( - action="execute_sql", - database=self, - object_ref=__name__, - ): - self.db_engine_spec.execute(cursor, sql_, self) - if i < len(sqls) - 1: - # If it's not the last, we don't keep the results - cursor.fetchall() - else: - # Last query, fetch and process the results - data = self.db_engine_spec.fetch_data(cursor) - result_set = SupersetResultSet( - data, cursor.description, self.db_engine_spec - ) - df = result_set.to_pandas_df() - if mutator: - df = mutator(df) + with temporarily_disconnect_metadata_db(): + for i, sql_ in enumerate(sqls): + sql_ = self.mutate_sql_based_on_config(sql_, is_split=True) + _log_query(sql_) + with event_logger.log_context( + action="execute_sql", + database=self, + object_ref=__name__, + ): + self.db_engine_spec.execute(cursor, sql_, self) + if i < len(sqls) - 1: + # If it's not the last, we don't keep the results + cursor.fetchall() + else: + # Last query, fetch and process the results + data = self.db_engine_spec.fetch_data(cursor) + result_set = SupersetResultSet( + data, cursor.description, self.db_engine_spec + ) + df = result_set.to_pandas_df() + if mutator: + df = mutator(df) return self.post_process_df(df) diff --git a/superset/sql_lab.py b/superset/sql_lab.py index 88e1bc1aad858..537fe9ca1df0a 100644 --- a/superset/sql_lab.py +++ b/superset/sql_lab.py @@ -50,7 +50,7 @@ SupersetParseError, ) from superset.extensions import celery_app, event_logger -from superset.models.core import Database +from superset.models.core import Database, temporarily_disconnect_metadata_db from superset.models.sql_lab import Query from superset.result_set import SupersetResultSet from superset.sql.parse import SQLStatement, Table @@ -304,7 +304,8 @@ def execute_sql_statement( # pylint: disable=too-many-statements, too-many-loca object_ref=__name__, ): with stats_timing("sqllab.query.time_executing_query", stats_logger): - db_engine_spec.execute_with_cursor(cursor, sql, query) + with temporarily_disconnect_metadata_db(): + db_engine_spec.execute_with_cursor(cursor, sql, query) with stats_timing("sqllab.query.time_fetching_results", stats_logger): logger.debug(