From 7ca8dc9a491b3eecdba549026937ae9fdd8e9040 Mon Sep 17 00:00:00 2001 From: Ralf Date: Mon, 30 Sep 2024 12:41:52 +0300 Subject: [PATCH] [FSTORE-1327] Activity UI doesn't correctly report Delta commit activities (#1384) --- java/beam/pom.xml | 2 +- java/flink/pom.xml | 2 +- java/hsfs/pom.xml | 2 +- java/pom.xml | 2 +- java/spark/pom.xml | 2 +- python/hsfs/core/delta_engine.py | 12 ++++++++++-- python/hsfs/version.py | 2 +- utils/java/pom.xml | 2 +- 8 files changed, 17 insertions(+), 9 deletions(-) diff --git a/java/beam/pom.xml b/java/beam/pom.xml index c825d8ec8d..f95e25a6ab 100644 --- a/java/beam/pom.xml +++ b/java/beam/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC3 + 3.8.0-RC4 4.0.0 diff --git a/java/flink/pom.xml b/java/flink/pom.xml index 6602e3401f..6a26a71e59 100644 --- a/java/flink/pom.xml +++ b/java/flink/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC3 + 3.8.0-RC4 4.0.0 diff --git a/java/hsfs/pom.xml b/java/hsfs/pom.xml index bb06c4c25e..f0e5cab6a1 100644 --- a/java/hsfs/pom.xml +++ b/java/hsfs/pom.xml @@ -5,7 +5,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC3 + 3.8.0-RC4 4.0.0 diff --git a/java/pom.xml b/java/pom.xml index e4990f1c3b..f55b77d542 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -7,7 +7,7 @@ com.logicalclocks hsfs-parent pom - 3.8.0-RC3 + 3.8.0-RC4 hsfs spark diff --git a/java/spark/pom.xml b/java/spark/pom.xml index fe86a03f9f..5cf82d4384 100644 --- a/java/spark/pom.xml +++ b/java/spark/pom.xml @@ -22,7 +22,7 @@ hsfs-parent com.logicalclocks - 3.8.0-RC3 + 3.8.0-RC4 4.0.0 diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 92edc91cf3..7defbb6f10 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -172,6 +172,8 @@ def _generate_merge_query(self, source_alias, updates_alias): @staticmethod def _get_last_commit_metadata(spark_context, base_path): fg_source_table = DeltaTable.forPath(spark_context, base_path) + + # Get info about the latest commit last_commit = fg_source_table.history(1).first().asDict() version = last_commit["version"] commit_timestamp = util.convert_event_time_to_timestamp( @@ -180,6 +182,12 @@ def _get_last_commit_metadata(spark_context, base_path): commit_date_string = util.get_hudi_datestr_from_timestamp(commit_timestamp) operation_metrics = last_commit["operationMetrics"] + # Get info about the oldest remaining commit + oldest_commit = fg_source_table.history().orderBy("version").first().asDict() + oldest_commit_timestamp = util.convert_event_time_to_timestamp( + oldest_commit["timestamp"] + ) + if version == 0: fg_commit = feature_group_commit.FeatureGroupCommit( commitid=None, @@ -188,7 +196,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numOutputRows"], rows_updated=0, rows_deleted=0, - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) else: fg_commit = feature_group_commit.FeatureGroupCommit( @@ -198,7 +206,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numTargetRowsInserted"], rows_updated=operation_metrics["numTargetRowsUpdated"], rows_deleted=operation_metrics["numTargetRowsDeleted"], - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) return fg_commit diff --git a/python/hsfs/version.py b/python/hsfs/version.py index 9f4b8b0b54..d83e7d5b26 100644 --- a/python/hsfs/version.py +++ b/python/hsfs/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "3.8.0rc3" +__version__ = "3.8.0rc4" diff --git a/utils/java/pom.xml b/utils/java/pom.xml index d5e9055024..ff9bfb98cf 100644 --- a/utils/java/pom.xml +++ b/utils/java/pom.xml @@ -5,7 +5,7 @@ com.logicalclocks hsfs-utils - 3.8.0-RC3 + 3.8.0-RC4 3.2.0.0-SNAPSHOT