From 71079958a7bca0d85170bba3a243a52aef1c9770 Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Fri, 15 Nov 2024 19:38:34 +0530 Subject: [PATCH] [HUDI-8395] Fix metaClient handling when running upgrade or downgrade (#12224) * [HUDI-8395] Fix metaClient handling when running upgrade or downgrade --------- Co-authored-by: danny0405 --- .../hudi/client/BaseHoodieWriteClient.java | 2 +- .../hudi/table/upgrade/UpgradeDowngrade.java | 4 ---- .../apache/hudi/util/CommonClientUtils.java | 4 ++-- .../common/model/HoodieCommitMetadata.java | 12 ++++++---- .../common/table/HoodieTableMetaClient.java | 8 +++++++ .../common/testutils/HoodieTestUtils.java | 2 ++ .../TestMultipleTableVersionWriting.scala | 22 ++++++++++++++----- 7 files changed, 37 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 56f08381daa0e..3e258d892b642 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1437,7 +1437,6 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option insta new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper); if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) { - metaClient = HoodieTableMetaClient.reload(metaClient); // Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits List instantsToRollback = tableServiceClient.getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime); @@ -1450,6 +1449,7 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option insta new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper) .run(HoodieTableVersion.current(), instantTime.orElse(null)); + metaClient.reloadTableConfig(); metaClient.reloadActiveTimeline(); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java index 20b351ff6be6f..9d7dc2df55c11 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java @@ -48,8 +48,6 @@ public class UpgradeDowngrade { private HoodieTableMetaClient metaClient; protected HoodieWriteConfig config; protected HoodieEngineContext context; - private StoragePath updatedPropsFilePath; - private StoragePath propsFilePath; public UpgradeDowngrade( HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context, @@ -57,8 +55,6 @@ public UpgradeDowngrade( this.metaClient = metaClient; this.config = config; this.context = context; - this.updatedPropsFilePath = new StoragePath(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE); - this.propsFilePath = new StoragePath(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE); this.upgradeDowngradeHelper = upgradeDowngradeHelper; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java index 320839af55bfc..a7d22d2e45356 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java @@ -28,8 +28,8 @@ public class CommonClientUtils { public static void validateTableVersion(HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) { // mismatch of table versions. if (!tableConfig.getTableVersion().equals(writeConfig.getWriteVersion())) { - throw new HoodieNotSupportedException(String.format("Table version (%s) and Writer version (%s) do not match.", - tableConfig.getTableVersion(), writeConfig.getWriteVersion())); + throw new HoodieNotSupportedException(String.format("Table version (%s) and Writer version (%s) do not match for table at: %s.", + tableConfig.getTableVersion(), writeConfig.getWriteVersion(), writeConfig.getBasePath())); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 46874bb10f8a1..22bf37d9c3dfc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.Serializable; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -500,10 +501,13 @@ public static T fromBytes(byte[] bytes, Class clazz) throws IOException { if (bytes.length == 0) { return clazz.newInstance(); } - return fromJsonString( - fromUTF8Bytes( - convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class)), - clazz); + try { + return fromJsonString(fromUTF8Bytes(convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class)), clazz); + } catch (Exception e) { + // fall back to the alternative method (0.x) + LOG.warn("Primary method failed; trying alternative deserialization method.", e); + return fromJsonString(new String(bytes, StandardCharsets.UTF_8), clazz); + } } catch (Exception e) { throw new IOException("unable to read commit metadata for bytes length: " + bytes.length, e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index d56689863c96f..30b4ff73d2f47 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -437,6 +437,14 @@ public synchronized HoodieActiveTimeline reloadActiveTimeline() { return activeTimeline; } + /** + * Reload the table config properties. + */ + public synchronized void reloadTableConfig() { + this.tableConfig = new HoodieTableConfig(this.storage, metaPath, + this.tableConfig.getRecordMergeMode(), this.tableConfig.getKeyGeneratorClassName(), this.tableConfig.getRecordMergeStrategyId()); + } + /** * Returns next instant time in the correct format. Lock is enabled by default. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java index fa5430d78ec23..ec8a6c8cb3164 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.HoodieStorageUtils; @@ -137,6 +138,7 @@ public static HoodieTableMetaClient init(String basePath, HoodieFileFormat baseF public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, HoodieTableVersion version) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.VERSION.key(), String.valueOf(version.versionCode())); + properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition"); return init(getDefaultStorageConf(), basePath, tableType, properties); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala index 15988d214d79d..fffd75f623749 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestMultipleTableVersionWriting.scala @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.HoodieTableVersion import org.apache.hudi.common.testutils.HoodieTestUtils import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException} +import org.apache.hudi.exception.{HoodieException, HoodieUpgradeDowngradeException} import org.apache.spark.sql.SaveMode import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -34,7 +34,7 @@ class TestMultipleTableVersionWriting extends HoodieSparkWriterTestBase { @Test def testTableVersionAndWriteVersionMatching(): Unit = { - val basePath = s"${tempBasePath}/tbl_1"; + val basePath = s"$tempBasePath/tbl_1" val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as partition") // write table with current version @@ -43,7 +43,7 @@ class TestMultipleTableVersionWriting extends HoodieSparkWriterTestBase { .mode(SaveMode.Overwrite) .save(basePath) - val metaClient = HoodieTestUtils.createMetaClient(basePath); + val metaClient = HoodieTestUtils.createMetaClient(basePath) assertEquals(HoodieTableVersion.current().versionCode(), metaClient.getTableConfig.getTableVersion.versionCode()) @@ -59,14 +59,24 @@ class TestMultipleTableVersionWriting extends HoodieSparkWriterTestBase { @Test def testThrowsExceptionForIncompatibleTableVersion(): Unit = { - val basePath = s"${tempBasePath}/tbl_2"; - HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.SIX); + val basePath = s"$tempBasePath/tbl_2" + HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.SIX) val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as partition") - assertThrows[HoodieNotSupportedException] { + // should error out when starting with table version 6 and writing with auto upgrade disabled + assertThrows[HoodieUpgradeDowngradeException] { df.write.format("hudi") + .option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false") .mode(SaveMode.Append) .save(basePath) } + // should succeed when writing with auto upgrade enabled (default) + df.write.format("hudi") + .mode(SaveMode.Append) + .save(basePath) + + val metaClient = HoodieTestUtils.createMetaClient(basePath) + assertEquals(HoodieTableVersion.current().versionCode(), + metaClient.getTableConfig.getTableVersion.versionCode()) } }