Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HWORKS-1416] Close sparks sessions #1370

Merged
merged 6 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1097,5 +1097,14 @@ protected String makeQueryName(String queryName, FeatureGroupBase featureGroup)
}
return queryName;
}


public void closeSparkSession() {
if (getSparkSession() != null) {
try {
getSparkSession().sparkContext().stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you calling stop() on the spark context and not close() on the Spark session?
The spark context is a "component" of the spark session. You want to close the entire session, not a single component.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the Python side you are correctly closing the session and not just the context.

} catch (Exception e) {
// No-OP
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why catching the exception in the first place? If something happens in the .stop() it would be useful if it shows up in the logs, so I would say we should avoid swallowing the exception.

}
}
}
12 changes: 11 additions & 1 deletion utils/java/src/main/java/com/logicalclocks/utils/MainClass.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,17 @@ public static void main(String[] args) throws Exception {
LOGGER.info("Hsfs utils write options: {}", writeOptions);

if (op.equals("offline_fg_materialization") || op.equals("offline_fg_backfill")) {
SparkEngine.getInstance().streamToHudiTable(streamFeatureGroup, writeOptions);
SparkEngine engine = SparkEngine.getInstance();
boolean suceeded = false;
try {
engine.streamToHudiTable(streamFeatureGroup, writeOptions);
suceeded = true;
} finally {
LOGGER.info("Closing spark session...");
engine.closeSparkSession();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this code outside the if statement considering that if an exception happens, the job shuts down correctly.
On the other hand, if by any chance the op is not one of those 2s, the if and the finally will be skipped entirely not closing the session.

LOGGER.info("Exiting with " + suceeded);
System.exit(suceeded ? 0 : 1);
}
}
}
}
Empty file.
6 changes: 6 additions & 0 deletions utils/python/hsfs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,9 @@ def parse_isoformat_date(da: str) -> datetime:
import_fg(job_conf)
elif args.op == "run_feature_monitoring":
run_feature_monitoring(job_conf)

if spark is not None:
try:
spark.stop()
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

pass
Loading