Skip to content

Commit

Permalink
Fix run_results.json Performance Regression (#8413)
Browse files Browse the repository at this point in the history
* Remedy performance regression by only writing run_results.json once.

* Write results before cleaning up connections.
  • Loading branch information
peterallenwebb authored Aug 15, 2023
1 parent cb4bc2d commit 18ee93c
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230815-104444.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Remedy performance regression by only writing run_results.json once.
time: 2023-08-15T10:44:44.836991-04:00
custom:
Author: peterallenwebb
Issue: "8360"
23 changes: 12 additions & 11 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,6 @@ def _handle_result(self, result):
cause = None
self._mark_dependent_errors(node.unique_id, result, cause)

interim_run_result = self.get_result(
results=self.node_results,
elapsed_time=time.time() - self.started_at,
generated_at=datetime.utcnow(),
)

if self.args.write_json and hasattr(interim_run_result, "write"):
interim_run_result.write(self.result_path())

def _cancel_connections(self, pool):
"""Given a pool, cancel all adapter connections and wait until all
runners gentle terminates.
Expand Down Expand Up @@ -378,8 +369,18 @@ def execute_nodes(self):
# ensure information about all nodes is propagated to run results when failing fast
return self.node_results
except KeyboardInterrupt:
run_result = self.get_result(
results=self.node_results,
elapsed_time=time.time() - self.started_at,
generated_at=datetime.utcnow(),
)

if self.args.write_json and hasattr(run_result, "write"):
run_result.write(self.result_path())

self._cancel_connections(pool)
print_run_end_messages(self.node_results, keyboard_interrupt=True)

raise

pool.close()
Expand Down Expand Up @@ -443,7 +444,7 @@ def run(self):
Run dbt for the query, based on the graph.
"""
# We set up a context manager here with "task_contextvars" because we
# we need the project_root in runtime_initialize.
# need the project_root in runtime_initialize.
with task_contextvars(project_root=self.config.project_root):
self._runtime_initialize()

Expand Down Expand Up @@ -584,7 +585,7 @@ def create_schema(relation: BaseRelation) -> None:
create_futures.append(fut)

for create_future in as_completed(create_futures):
# trigger/re-raise any excceptions while creating schemas
# trigger/re-raise any exceptions while creating schemas
create_future.result()

def get_result(self, results, elapsed_time, generated_at):
Expand Down

0 comments on commit 18ee93c

Please sign in to comment.