Skip to content

Commit

Permalink
feat: temporarily disconnect metadata db during long analytics queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Dec 5, 2024
1 parent 638f82b commit e287277
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 23 deletions.
1 change: 1 addition & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ class D3TimeFormat(TypedDict, total=False):
# If on, you'll want to add "https://avatars.slack-edge.com" to the list of allowed
# domains in your TALISMAN_CONFIG
"SLACK_ENABLE_AVATARS": False,
"DISABLE_METADATA_DB_DURING_ANALYTICS": True,
}

# ------------------------------
Expand Down
72 changes: 51 additions & 21 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,35 @@
DB_CONNECTION_MUTATOR = config["DB_CONNECTION_MUTATOR"]


@contextmanager
def temporarily_disconnect_metadata_db(): # type: ignore
"""
Temporary disconnects the metadata database session during analytics query.
The goal here is to lower the number of concurrent connections to the metadata database,
given that Superset has no control over the duration of the analytics query.
If using a connection pool, the connn is release during the period. If not, the conn is closed
and has to be re-established.
Only has an effect if feature flag DISABLE_METADATA_DB_DURING_ANALYTICS is on
"""
do_it = is_feature_enabled("DISABLE_METADATA_DB_DURING_ANALYTICS")
print("YO: temporarily_disconnect_metadata_db")
try:
if do_it:
print("YO: Disconnecting")
db.session.close()
yield None
finally:
if do_it:
print("YO: Reconnecting to metadata database")
conn = db.engine.connect()
print(f"YO: conn: {conn.closed}")
print(conn)
conn.open()
db.session = db._make_scoped_session()


class KeyValue(Model): # pylint: disable=too-few-public-methods
"""Used for any type of key-value store"""

Expand Down Expand Up @@ -691,27 +720,28 @@ def _log_query(sql: str) -> None:
with self.get_raw_connection(catalog=catalog, schema=schema) as conn:
cursor = conn.cursor()
df = None
for i, sql_ in enumerate(sqls):
sql_ = self.mutate_sql_based_on_config(sql_, is_split=True)
_log_query(sql_)
with event_logger.log_context(
action="execute_sql",
database=self,
object_ref=__name__,
):
self.db_engine_spec.execute(cursor, sql_, self)
if i < len(sqls) - 1:
# If it's not the last, we don't keep the results
cursor.fetchall()
else:
# Last query, fetch and process the results
data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
if mutator:
df = mutator(df)
with temporarily_disconnect_metadata_db():
for i, sql_ in enumerate(sqls):
sql_ = self.mutate_sql_based_on_config(sql_, is_split=True)
_log_query(sql_)
with event_logger.log_context(
action="execute_sql",
database=self,
object_ref=__name__,
):
self.db_engine_spec.execute(cursor, sql_, self)
if i < len(sqls) - 1:
# If it's not the last, we don't keep the results
cursor.fetchall()
else:
# Last query, fetch and process the results
data = self.db_engine_spec.fetch_data(cursor)
result_set = SupersetResultSet(
data, cursor.description, self.db_engine_spec
)
df = result_set.to_pandas_df()
if mutator:
df = mutator(df)

return self.post_process_df(df)

Expand Down
5 changes: 3 additions & 2 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
SupersetParseError,
)
from superset.extensions import celery_app, event_logger
from superset.models.core import Database
from superset.models.core import Database, temporarily_disconnect_metadata_db
from superset.models.sql_lab import Query
from superset.result_set import SupersetResultSet
from superset.sql.parse import SQLStatement, Table
Expand Down Expand Up @@ -304,7 +304,8 @@ def execute_sql_statement( # pylint: disable=too-many-statements, too-many-loca
object_ref=__name__,
):
with stats_timing("sqllab.query.time_executing_query", stats_logger):
db_engine_spec.execute_with_cursor(cursor, sql, query)
with temporarily_disconnect_metadata_db():
db_engine_spec.execute_with_cursor(cursor, sql, query)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
Expand Down

0 comments on commit e287277

Please sign in to comment.