Skip to content

Commit

Permalink
[HUDI-8395] Fix metaClient handling when running upgrade or downgrade (
Browse files Browse the repository at this point in the history
…apache#12224)

* [HUDI-8395] Fix metaClient handling when running upgrade or downgrade

---------

Co-authored-by: danny0405 <[email protected]>
  • Loading branch information
codope and danny0405 authored Nov 15, 2024
1 parent 8792638 commit 7107995
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,6 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> insta
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper);

if (upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
metaClient = HoodieTableMetaClient.reload(metaClient);
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
List<String> instantsToRollback = tableServiceClient.getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, instantTime);

Expand All @@ -1450,6 +1449,7 @@ protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> insta
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
.run(HoodieTableVersion.current(), instantTime.orElse(null));

metaClient.reloadTableConfig();
metaClient.reloadActiveTimeline();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,13 @@ public class UpgradeDowngrade {
private HoodieTableMetaClient metaClient;
protected HoodieWriteConfig config;
protected HoodieEngineContext context;
private StoragePath updatedPropsFilePath;
private StoragePath propsFilePath;

public UpgradeDowngrade(
HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieEngineContext context,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
this.metaClient = metaClient;
this.config = config;
this.context = context;
this.updatedPropsFilePath = new StoragePath(metaClient.getMetaPath(), HOODIE_UPDATED_PROPERTY_FILE);
this.propsFilePath = new StoragePath(metaClient.getMetaPath(), HoodieTableConfig.HOODIE_PROPERTIES_FILE);
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class CommonClientUtils {
public static void validateTableVersion(HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
// mismatch of table versions.
if (!tableConfig.getTableVersion().equals(writeConfig.getWriteVersion())) {
throw new HoodieNotSupportedException(String.format("Table version (%s) and Writer version (%s) do not match.",
tableConfig.getTableVersion(), writeConfig.getWriteVersion()));
throw new HoodieNotSupportedException(String.format("Table version (%s) and Writer version (%s) do not match for table at: %s.",
tableConfig.getTableVersion(), writeConfig.getWriteVersion(), writeConfig.getBasePath()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -500,10 +501,13 @@ public static <T> T fromBytes(byte[] bytes, Class<T> clazz) throws IOException {
if (bytes.length == 0) {
return clazz.newInstance();
}
return fromJsonString(
fromUTF8Bytes(
convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class)),
clazz);
try {
return fromJsonString(fromUTF8Bytes(convertCommitMetadataToJsonBytes(deserializeCommitMetadata(bytes), org.apache.hudi.avro.model.HoodieCommitMetadata.class)), clazz);
} catch (Exception e) {
// fall back to the alternative method (0.x)
LOG.warn("Primary method failed; trying alternative deserialization method.", e);
return fromJsonString(new String(bytes, StandardCharsets.UTF_8), clazz);
}
} catch (Exception e) {
throw new IOException("unable to read commit metadata for bytes length: " + bytes.length, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ public synchronized HoodieActiveTimeline reloadActiveTimeline() {
return activeTimeline;
}

/**
* Reload the table config properties.
*/
public synchronized void reloadTableConfig() {
this.tableConfig = new HoodieTableConfig(this.storage, metaPath,
this.tableConfig.getRecordMergeMode(), this.tableConfig.getKeyGeneratorClassName(), this.tableConfig.getRecordMergeStrategyId());
}

/**
* Returns next instant time in the correct format. Lock is enabled by default.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
Expand Down Expand Up @@ -137,6 +138,7 @@ public static HoodieTableMetaClient init(String basePath, HoodieFileFormat baseF
public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, HoodieTableVersion version) throws IOException {
Properties properties = new Properties();
properties.setProperty(HoodieTableConfig.VERSION.key(), String.valueOf(version.versionCode()));
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition");
return init(getDefaultStorageConf(), basePath, tableType, properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableVersion
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
import org.apache.hudi.exception.{HoodieException, HoodieUpgradeDowngradeException}
import org.apache.spark.sql.SaveMode
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
Expand All @@ -34,7 +34,7 @@ class TestMultipleTableVersionWriting extends HoodieSparkWriterTestBase {

@Test
def testTableVersionAndWriteVersionMatching(): Unit = {
val basePath = s"${tempBasePath}/tbl_1";
val basePath = s"$tempBasePath/tbl_1"
val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as partition")

// write table with current version
Expand All @@ -43,7 +43,7 @@ class TestMultipleTableVersionWriting extends HoodieSparkWriterTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)

val metaClient = HoodieTestUtils.createMetaClient(basePath);
val metaClient = HoodieTestUtils.createMetaClient(basePath)
assertEquals(HoodieTableVersion.current().versionCode(),
metaClient.getTableConfig.getTableVersion.versionCode())

Expand All @@ -59,14 +59,24 @@ class TestMultipleTableVersionWriting extends HoodieSparkWriterTestBase {

@Test
def testThrowsExceptionForIncompatibleTableVersion(): Unit = {
val basePath = s"${tempBasePath}/tbl_2";
HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.SIX);
val basePath = s"$tempBasePath/tbl_2"
HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, HoodieTableVersion.SIX)

val df = spark.range(1).selectExpr("1 as id", "1 as name", "1 as partition")
assertThrows[HoodieNotSupportedException] {
// should error out when starting with table version 6 and writing with auto upgrade disabled
assertThrows[HoodieUpgradeDowngradeException] {
df.write.format("hudi")
.option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key(), "false")
.mode(SaveMode.Append)
.save(basePath)
}
// should succeed when writing with auto upgrade enabled (default)
df.write.format("hudi")
.mode(SaveMode.Append)
.save(basePath)

val metaClient = HoodieTestUtils.createMetaClient(basePath)
assertEquals(HoodieTableVersion.current().versionCode(),
metaClient.getTableConfig.getTableVersion.versionCode())
}
}

0 comments on commit 7107995

Please sign in to comment.