From 022a44c26371d3024cbbed323554f310fd13e733 Mon Sep 17 00:00:00 2001 From: Ryan Pifer Date: Wed, 11 Nov 2020 00:44:57 -0800 Subject: [PATCH] [RFC-15][HUDI-1325] Merge updates of unsynced instants to metadata table --- .../HoodieBackedTableMetadataWriter.java | 240 +------------- .../client/TestCompactionAdminClient.java | 6 + .../hudi/metadata/TestHoodieFsMetadata.java | 106 +++++- .../table/upgrade/TestUpgradeDowngrade.java | 6 + .../testutils/HoodieClientTestHarness.java | 9 +- .../metadata/HoodieBackedTableMetadata.java | 16 +- ...dieMetadataMergedInstantRecordScanner.java | 100 ++++++ .../HoodieTableMetadataTimelineUtil.java | 312 ++++++++++++++++++ 8 files changed, 542 insertions(+), 253 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataTimelineUtil.java diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 992c240e90f0..2c17fdb211dc 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -44,10 +44,8 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; -import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -55,7 +53,6 @@ import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieMetricsConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.metrics.DistributedRegistry; @@ -85,7 +82,6 @@ import scala.Tuple2; import static org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX; -import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; /** @@ -388,59 +384,11 @@ private void syncFromInstants(JavaSparkContext jsc, HoodieTableMetaClient datase LOG.info("Syncing " + instantsToSync.size() + " instants to metadata table: " + instantsToSync); // Read each instant in order and sync it to metadata table - final HoodieActiveTimeline timeline = datasetMetaClient.getActiveTimeline(); for (HoodieInstant instant : instantsToSync) { LOG.info("Syncing instant " + instant + " to metadata table"); - - switch (instant.getAction()) { - case HoodieTimeline.CLEAN_ACTION: { - // CLEAN is synced from the - // - inflight instant which contains the HoodieCleanerPlan, or - // - complete instant which contains the HoodieCleanMetadata - try { - HoodieInstant inflightCleanInstant = new HoodieInstant(true, instant.getAction(), instant.getTimestamp()); - ValidationUtils.checkArgument(inflightCleanInstant.isInflight()); - HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant); - update(cleanerPlan, instant.getTimestamp()); - } catch (HoodieIOException e) { - HoodieInstant cleanInstant = new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); - ValidationUtils.checkArgument(cleanInstant.isCompleted()); - HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant); - update(cleanMetadata, instant.getTimestamp()); - } - break; - } - case HoodieTimeline.DELTA_COMMIT_ACTION: - case HoodieTimeline.COMMIT_ACTION: - case HoodieTimeline.COMPACTION_ACTION: { - ValidationUtils.checkArgument(instant.isCompleted()); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - update(commitMetadata, instant.getTimestamp()); - break; - } - case HoodieTimeline.ROLLBACK_ACTION: { - ValidationUtils.checkArgument(instant.isCompleted()); - HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( - timeline.getInstantDetails(instant).get()); - update(rollbackMetadata, instant.getTimestamp()); - break; - } - case HoodieTimeline.RESTORE_ACTION: { - ValidationUtils.checkArgument(instant.isCompleted()); - HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( - timeline.getInstantDetails(instant).get()); - update(restoreMetadata, instant.getTimestamp()); - break; - } - case HoodieTimeline.SAVEPOINT_ACTION: { - ValidationUtils.checkArgument(instant.isCompleted()); - // Nothing to be done here - break; - } - default: { - throw new HoodieException("Unknown type of action " + instant.getAction()); - } + Option> records = HoodieTableMetadataTimelineUtil.convertInstantToMetaRecords(datasetMetaClient, instant); + if (records.isPresent()) { + commit(jsc, prepRecords(jsc, records.get(), MetadataPartitionType.FILES.partitionPath()), instant.getTimestamp()); } } // re-init the table metadata, for any future writes. @@ -462,38 +410,7 @@ public void update(HoodieCommitMetadata commitMetadata, String instantTime) { return; } - List records = new LinkedList<>(); - List allPartitions = new LinkedList<>(); - commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { - final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName; - allPartitions.add(partition); - - Map newFiles = new HashMap<>(writeStats.size()); - writeStats.forEach(hoodieWriteStat -> { - String pathWithPartition = hoodieWriteStat.getPath(); - if (pathWithPartition == null) { - // Empty partition - return; - } - - int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1; - String filename = pathWithPartition.substring(offset); - ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); - newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); - }); - - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( - partition, Option.of(newFiles), Option.empty()); - records.add(record); - }); - - // New partitions created - HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList<>(allPartitions)); - records.add(record); - - LOG.info("Updating at " + instantTime + " from Commit/" + commitMetadata.getOperationType() - + ". #partitions_updated=" + records.size()); + List records = HoodieTableMetadataTimelineUtil.convertMetadataToRecords(commitMetadata, instantTime); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } @@ -516,21 +433,7 @@ public void update(HoodieCleanerPlan cleanerPlan, String instantTime) { return; } - List records = new LinkedList<>(); - int[] fileDeleteCount = {0}; - cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> { - fileDeleteCount[0] += deletedPathInfo.size(); - - // Files deleted from a partition - List deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName()) - .collect(Collectors.toList()); - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), - Option.of(deletedFilenames)); - records.add(record); - }); - - LOG.info("Updating at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); + List records = HoodieTableMetadataTimelineUtil.convertMetadataToRecords(cleanerPlan, instantTime); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } @@ -546,21 +449,7 @@ public void update(HoodieCleanMetadata cleanMetadata, String instantTime) { return; } - List records = new LinkedList<>(); - int[] fileDeleteCount = {0}; - - cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { - // Files deleted from a partition - List deletedFiles = partitionMetadata.getSuccessDeleteFiles(); - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), - Option.of(new ArrayList<>(deletedFiles))); - - records.add(record); - fileDeleteCount[0] += deletedFiles.size(); - }); - - LOG.info("Updating at " + instantTime + " from Clean. #partitions_updated=" + records.size() - + ", #files_deleted=" + fileDeleteCount[0]); + List records = HoodieTableMetadataTimelineUtil.convertMetadataToRecords(cleanMetadata, instantTime); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } @@ -576,12 +465,8 @@ public void update(HoodieRestoreMetadata restoreMetadata, String instantTime) { return; } - Map> partitionToAppendedFiles = new HashMap<>(); - Map> partitionToDeletedFiles = new HashMap<>(); - restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { - rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles)); - }); - commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); + List records = HoodieTableMetadataTimelineUtil.convertMetadataToRecords(restoreMetadata, instantTime); + commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } /** @@ -596,114 +481,7 @@ public void update(HoodieRollbackMetadata rollbackMetadata, String instantTime) return; } - Map> partitionToAppendedFiles = new HashMap<>(); - Map> partitionToDeletedFiles = new HashMap<>(); - processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles); - commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); - } - - /** - * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. - * - * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This - * function will extract this change file for each partition. - * - * @param rollbackMetadata {@code HoodieRollbackMetadata} - * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. - * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. - */ - private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, - Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles) { - rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { - final String partition = pm.getPartitionPath(); - - if (!pm.getSuccessDeleteFiles().isEmpty()) { - if (!partitionToDeletedFiles.containsKey(partition)) { - partitionToDeletedFiles.put(partition, new ArrayList<>()); - } - - // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() - List deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) - .collect(Collectors.toList()); - partitionToDeletedFiles.get(partition).addAll(deletedFiles); - } - - if (!pm.getAppendFiles().isEmpty()) { - if (!partitionToAppendedFiles.containsKey(partition)) { - partitionToAppendedFiles.put(partition, new HashMap<>()); - } - - // Extract appended file name from the absolute paths saved in getAppendFiles() - pm.getAppendFiles().forEach((path, size) -> { - partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { - return size + oldSize; - }); - }); - } - }); - } - - /** - * Create file delete records and commit. - * - * @param partitionToDeletedFiles {@code Map} of partitions and the deleted files - * @param instantTime Timestamp at which the deletes took place - * @param operation Type of the operation which caused the files to be deleted - */ - private void commitRollback(JavaSparkContext jsc, Map> partitionToDeletedFiles, - Map> partitionToAppendedFiles, String instantTime, - String operation) { - List records = new LinkedList<>(); - int[] fileChangeCount = {0, 0}; // deletes, appends - - partitionToDeletedFiles.forEach((partition, deletedFiles) -> { - // Rollbacks deletes instants from timeline. The instant being rolled-back may not have been synced to the - // metadata table. Hence, the deleted filed need to be checked against the metadata. - try { - FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new Path(metadata.getDatasetBasePath(), partition)); - Set currentFiles = - Arrays.stream(existingStatuses).map(s -> s.getPath().getName()).collect(Collectors.toSet()); - - int origCount = deletedFiles.size(); - deletedFiles.removeIf(f -> !currentFiles.contains(f)); - if (deletedFiles.size() != origCount) { - LOG.warn("Some Files to be deleted as part of " + operation + " at " + instantTime + " were not found in the " - + " metadata for partition " + partition - + ". To delete = " + origCount + ", found=" + deletedFiles.size()); - } - - fileChangeCount[0] += deletedFiles.size(); - - Option> filesAdded = Option.empty(); - if (partitionToAppendedFiles.containsKey(partition)) { - filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); - } - - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, - Option.of(new ArrayList<>(deletedFiles))); - records.add(record); - } catch (IOException e) { - throw new HoodieMetadataException("Failed to commit rollback deletes at instant " + instantTime, e); - } - }); - - partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { - fileChangeCount[1] += appendedFileMap.size(); - - // Validate that no appended file has been deleted - ValidationUtils.checkState( - !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), - "Rollback file cannot both be appended and deleted"); - - // New files added to a partition - HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap), - Option.empty()); - records.add(record); - }); - - LOG.info("Updating at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() - + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); + List records = HoodieTableMetadataTimelineUtil.convertMetadataToRecords(rollbackMetadata, instantTime); commit(jsc, prepRecords(jsc, records, MetadataPartitionType.FILES.partitionPath()), instantTime); } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java index 1200f67cc079..c42110e7ca3c 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java @@ -37,6 +37,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,6 +71,11 @@ public void setUp() throws Exception { client = new CompactionAdminClient(jsc, basePath); } + @AfterEach + public void cleanUp() throws Exception { + cleanupResources(); + } + @Test public void testUnscheduleCompactionPlan() throws Exception { int numEntriesPerInstant = 10; diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java index 7dfb67c295cd..7995fd3961ad 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java +++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java @@ -101,7 +101,7 @@ public void init(HoodieTableType tableType, boolean useDFS) throws IOException { initDFS(); dfs.mkdirs(new Path(basePath)); } - initMetaClient(); + initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); @@ -466,10 +466,9 @@ public void testSync(HoodieTableType tableType) throws Exception { newCommitTime = HoodieActiveTimeline.createNewInstantTime(); client.startCommitWithTime(newCommitTime); - validateMetadata(client); assertTrue(metadata(client).isInSync()); + validateMetadata(client); } - } /** @@ -634,6 +633,78 @@ public void testMetadataMetrics() throws Exception { } } + @ParameterizedTest + @EnumSource(HoodieTableType.class) + public void testMetadataOutOfSync(HoodieTableType tableType) throws Exception { + init(tableType); + + HoodieWriteClient unsyncedClient = new HoodieWriteClient<>(jsc, getWriteConfig(true, true)); + + // Enable metadata so table is initialized + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, true))) { + // Perform Bulk Insert + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 20); + client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect(); + } + + // Perform commit operations with metadata disabled + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, false))) { + // Perform Insert + String newCommitTime = "002"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateInserts(newCommitTime, 20); + client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); + + // Perform Upsert + newCommitTime = "003"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUniqueUpdates(newCommitTime, 20); + client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + // Compaction + if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { + newCommitTime = "004"; + client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); + client.compact(newCommitTime); + } + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + + // Perform clean operation with metadata disabled + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, false))) { + // One more commit needed to trigger clean so upsert and compact + String newCommitTime = "005"; + client.startCommitWithTime(newCommitTime); + List records = dataGen.generateUpdates(newCommitTime, 20); + client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); + + if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) { + newCommitTime = "006"; + client.scheduleCompactionAtInstant(newCommitTime, Option.empty()); + client.compact(newCommitTime); + } + + // Clean + newCommitTime = "007"; + client.clean(newCommitTime); + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + + // Perform restore with metadata disabled + try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, getWriteConfig(true, false))) { + client.restoreToInstant("004"); + } + + assertFalse(metadata(unsyncedClient).isInSync()); + validateMetadata(unsyncedClient); + } + /** * Validate the metadata tables contents to ensure it matches what is on the file system. * @@ -641,25 +712,18 @@ public void testMetadataMetrics() throws Exception { */ private void validateMetadata(HoodieWriteClient client) throws IOException { HoodieWriteConfig config = client.getConfig(); - HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); - assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + HoodieBackedTableMetadata metadataReader = metadata(client); + assertNotNull(metadataReader, "MetadataReader should have been initialized"); if (!config.useFileListingMetadata()) { return; } HoodieTimer timer = new HoodieTimer().startTimer(); - // Validate write config for metadata table - HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); - assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table"); - assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table"); - - // Metadata table should be in sync with the dataset - assertTrue(metadata(client).isInSync()); - // Partitions should match List fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(dfs, basePath); - List metadataPartitions = metadataWriter.metadata().getAllPartitionPaths(); + List metadataPartitions = metadataReader.getAllPartitionPaths(); Collections.sort(fsPartitions); Collections.sort(metadataPartitions); @@ -681,7 +745,7 @@ private void validateMetadata(HoodieWriteClient client) throws IOException { partitionPath = new Path(basePath, partition); } FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(dfs, partitionPath); - FileStatus[] metaStatuses = metadataWriter.metadata().getAllFilesInPartition(partitionPath); + FileStatus[] metaStatuses = metadataReader.getAllFilesInPartition(partitionPath); List fsFileNames = Arrays.stream(fsStatuses) .map(s -> s.getPath().getName()).collect(Collectors.toList()); List metadataFilenames = Arrays.stream(metaStatuses) @@ -717,10 +781,18 @@ private void validateMetadata(HoodieWriteClient client) throws IOException { } }); - HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); + HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client); + assertNotNull(metadataWriter, "MetadataWriter should have been initialized"); + + // Validate write config for metadata table + HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig(); + assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata table for metadata table"); + assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify for metadata table"); // Metadata table should be in sync with the dataset - assertTrue(metadataWriter.metadata().isInSync()); + assertTrue(metadata(client).isInSync()); + + HoodieTableMetaClient metadataMetaClient = new HoodieTableMetaClient(hadoopConf, metadataTableBasePath); // Metadata table is MOR assertEquals(metadataMetaClient.getTableType(), HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR"); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index e1dc4cefa145..da5f434c9897 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -46,6 +46,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -91,6 +92,11 @@ public void setUp() throws Exception { initDFSMetaClient(); } + @AfterEach + public void cleanUp() throws Exception { + cleanupResources(); + } + @Test public void testLeftOverUpdatedPropFileCleanup() throws IOException { testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ); diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index f1e3f175ea9a..5fda8df11c01 100644 --- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -21,6 +21,7 @@ import org.apache.hudi.client.HoodieWriteClient; import org.apache.hudi.client.SparkTaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -42,6 +43,7 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; @@ -189,6 +191,11 @@ protected void cleanupFileSystem() throws IOException { * @throws IOException */ protected void initMetaClient() throws IOException { + initMetaClient(getTableType()); + } + + + protected void initMetaClient(HoodieTableType tableType) throws IOException { if (basePath == null) { throw new IllegalStateException("The base path has not been initialized."); } @@ -197,7 +204,7 @@ protected void initMetaClient() throws IOException { throw new IllegalStateException("The Spark context has not been initialized."); } - metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType()); + metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 5ce933d3716a..a64bbcf6a0b0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -87,6 +87,7 @@ public class HoodieBackedTableMetadata implements HoodieTableMetadata { // Readers for the base and log file which store the metadata private transient HoodieFileReader basefileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; + private transient HoodieMetadataMergedInstantRecordScanner timelineRecordScanner; public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled, boolean validateLookups, boolean assumeDatePartitioning) { @@ -237,10 +238,6 @@ FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException { FileStatus[] statuses = {}; if (hoodieRecord.isPresent()) { - if (!hoodieRecord.get().getData().getDeletions().isEmpty()) { - throw new HoodieMetadataException("Metadata record for partition " + partitionName + " is inconsistent: " - + hoodieRecord.get().getData()); - } statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath); } @@ -309,6 +306,13 @@ private Option> getMergedRecordByKey(String } } + // Retrieve record from unsynced timeline instants + Option> timelineHoodieRecord = timelineRecordScanner.getRecordByKey(key); + if (timelineHoodieRecord.isPresent()) { + HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(hoodieRecord.getData()); + hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload); + } + return Option.ofNullable(hoodieRecord); } @@ -367,6 +371,10 @@ private synchronized void openBaseAndLogFiles() throws IOException { LOG.info("Opened metadata log files from " + logFilePaths + " at instant " + latestInstantTime + "(dataset instant=" + latestInstantTime + ", metadata instant=" + latestMetaInstantTimestamp + ")"); + List unsyncedInstants = findInstantsToSync(datasetMetaClient); + timelineRecordScanner = + new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unsyncedInstants, schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null); + metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer())); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java new file mode 100644 index 000000000000..2aac670291a1 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java @@ -0,0 +1,100 @@ +package org.apache.hudi.metadata; + +import org.apache.avro.Schema; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.DefaultSizeEstimator; +import org.apache.hudi.common.util.HoodieRecordSizeEstimator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +/** + * Provides functionality to convert timeline instants to table metadata records and then merge by key. Specify + * a filter to limit keys that are merged and stored in memory + */ +public class HoodieMetadataMergedInstantRecordScanner { + + private static final Logger LOG = LogManager.getLogger(HoodieMetadataMergedInstantRecordScanner.class); + + HoodieTableMetaClient metaClient; + private List instants; + private Set mergeKeyFilter; + protected final ExternalSpillableMap> records; + + public HoodieMetadataMergedInstantRecordScanner(HoodieTableMetaClient metaClient, List instants, + Schema readerSchema, Long maxMemorySizeInBytes, + String spillableMapBasePath, Set mergeKeyFilter) throws IOException { + this.metaClient = metaClient; + this.instants = instants; + this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet(); + this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), + new HoodieRecordSizeEstimator(readerSchema)); + + scan(); + } + + + private void scan() { + for (HoodieInstant instant : instants) { + try { + processInstant(instant); + } catch (Exception e) { + LOG.error(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e); + throw new HoodieException(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e); + } + } + } + + /** + * Converts an instant to metadata table records and processes each record + * + * @param instant + * @throws IOException + */ + private void processInstant(HoodieInstant instant) throws IOException { + Option> records = HoodieTableMetadataTimelineUtil.convertInstantToMetaRecords(metaClient, instant); + if (records.isPresent()) { + records.get().forEach(record -> processNextRecord(record)); + } + } + + /** + * Process metadata table record by merging with existing record if it is a part of the key filter + * + * @param hoodieRecord + */ + private void processNextRecord(HoodieRecord hoodieRecord) { + String key = hoodieRecord.getRecordKey(); + if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) { + if (records.containsKey(key)) { + // Merge and store the merged record + HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(records.get(key).getData()); + records.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue)); + } else { + // Put the record as is + records.put(key, hoodieRecord); + } + } + } + + /** + * Retrieve merged hoodie record for given key + * + * @param key of the record to retrieve + * @return {@code HoodieRecord} if key was found else {@code Option.empty()} + */ + public Option> getRecordByKey(String key) { + return Option.ofNullable((HoodieRecord) records.get(key)); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataTimelineUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataTimelineUtil.java new file mode 100644 index 000000000000..3c9392b03676 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataTimelineUtil.java @@ -0,0 +1,312 @@ +package org.apache.hudi.metadata; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieCleanMetadata; +import org.apache.hudi.avro.model.HoodieCleanerPlan; +import org.apache.hudi.avro.model.HoodieRestoreMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; +import org.apache.hudi.common.util.CleanerUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; + +/** + * A utility to convert timeline information to metadata table records + */ +public class HoodieTableMetadataTimelineUtil { + + private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataTimelineUtil.class); + + + /** + * Converts a timeline instant to metadata table records + * + * @param datasetMetaClient The meta client associated with the timeline instant + * @param instant to fetch and convert to metadata table records + * @return a list of metadata table records + * @throws IOException + */ + public static Option> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, HoodieInstant instant) throws IOException { + + HoodieTimeline timeline = datasetMetaClient.getActiveTimeline(); + Option> records = Option.empty(); + switch (instant.getAction()) { + case HoodieTimeline.CLEAN_ACTION: { + // CLEAN is synced from the + // - inflight instant which contains the HoodieCleanerPlan, or + // - complete instant which contains the HoodieCleanMetadata + try { + HoodieInstant inflightCleanInstant = new HoodieInstant(true, instant.getAction(), instant.getTimestamp()); + ValidationUtils.checkArgument(inflightCleanInstant.isInflight()); + HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant); + records = Option.of(convertMetadataToRecords(cleanerPlan, instant.getTimestamp())); + break; + } catch (HoodieIOException | IOException e) { + HoodieInstant cleanInstant = new HoodieInstant(false, instant.getAction(), instant.getTimestamp()); + ValidationUtils.checkArgument(cleanInstant.isCompleted()); + HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant); + records = Option.of(convertMetadataToRecords(cleanMetadata, instant.getTimestamp())); + break; + } + } + case HoodieTimeline.DELTA_COMMIT_ACTION: + case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.COMPACTION_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + records = Option.of(convertMetadataToRecords(commitMetadata, instant.getTimestamp())); + break; + } + case HoodieTimeline.ROLLBACK_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(rollbackMetadata, instant.getTimestamp())); + break; + } + case HoodieTimeline.RESTORE_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(restoreMetadata, instant.getTimestamp())); + break; + } + case HoodieTimeline.SAVEPOINT_ACTION: { + ValidationUtils.checkArgument(instant.isCompleted()); + // Nothing to be done here + break; + } + default: { + throw new HoodieException("Unknown type of action " + instant.getAction()); + } + } + return records; + } + + /** + * Finds all new files/partitions created as part of commit and creates metadata table records for them + * + * @param commitMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String instantTime) { + + List records = new LinkedList<>(); + List allPartitions = new LinkedList<>(); + commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, writeStats) -> { + final String partition = partitionStatName.equals("") ? NON_PARTITIONED_NAME : partitionStatName; + allPartitions.add(partition); + + Map newFiles = new HashMap<>(writeStats.size()); + writeStats.forEach(hoodieWriteStat -> { + String pathWithPartition = hoodieWriteStat.getPath(); + if (pathWithPartition == null) { + // Empty partition + return; + } + + int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1; + String filename = pathWithPartition.substring(offset); + ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata"); + newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes()); + }); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord( + partition, Option.of(newFiles), Option.empty()); + records.add(record); + }); + + // New partitions created + HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new ArrayList(allPartitions)); + records.add(record); + + LOG.info("Found at " + instantTime + " from Commit/" + commitMetadata.getOperationType() + + ". #partitions_updated=" + records.size()); + + return records; + } + + /** + * Finds all files that will be deleted as part of a planned clean and creates metadata table records for them + * + * @param cleanerPlan from timeline to convert + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieCleanerPlan cleanerPlan, String instantTime) { + List records = new LinkedList<>(); + + int[] fileDeleteCount = {0}; + cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> { + fileDeleteCount[0] += deletedPathInfo.size(); + + // Files deleted from a partition + List deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName()) + .collect(Collectors.toList()); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(deletedFilenames)); + records.add(record); + }); + + LOG.info("Found at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + + return records; + } + + /** + * Finds all files that were deleted as part of a clean and creates metadata table records for them + * + * @param cleanMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) { + List records = new LinkedList<>(); + int[] fileDeleteCount = {0}; + + cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> { + // Files deleted from a partition + List deletedFiles = partitionMetadata.getSuccessDeleteFiles(); + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(), + Option.of(new ArrayList<>(deletedFiles))); + + records.add(record); + fileDeleteCount[0] += deletedFiles.size(); + }); + + LOG.info("Found at " + instantTime + " from Clean. #partitions_updated=" + records.size() + + ", #files_deleted=" + fileDeleteCount[0]); + + return records; + } + + /** + * Aggregates all files deleted and appended to from all rollbacks associated with a restore operation then + * creates metadata table records for them + * + * @param restoreMetadata + * @param instantTime + * @return a list of metadata table records + */ + public static List convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String instantTime) { + + Map> partitionToAppendedFiles = new HashMap<>(); + Map> partitionToDeletedFiles = new HashMap<>(); + restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> { + rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, partitionToAppendedFiles)); + }); + + + return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Restore"); + } + + public static List convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String instantTime) { + + Map> partitionToAppendedFiles = new HashMap<>(); + Map> partitionToDeletedFiles = new HashMap<>(); + processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles); + return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback"); + } + + /** + * Extracts information about the deleted and append files from the {@code HoodieRollbackMetadata}. + * + * During a rollback files may be deleted (COW, MOR) or rollback blocks be appended (MOR only) to files. This + * function will extract this change file for each partition. + * + * @param rollbackMetadata {@code HoodieRollbackMetadata} + * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. + * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. + */ + private static void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata, + Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles) { + rollbackMetadata.getPartitionMetadata().values().forEach(pm -> { + final String partition = pm.getPartitionPath(); + + if (!pm.getSuccessDeleteFiles().isEmpty()) { + if (!partitionToDeletedFiles.containsKey(partition)) { + partitionToDeletedFiles.put(partition, new ArrayList<>()); + } + + // Extract deleted file name from the absolute paths saved in getSuccessDeleteFiles() + List deletedFiles = pm.getSuccessDeleteFiles().stream().map(p -> new Path(p).getName()) + .collect(Collectors.toList()); + partitionToDeletedFiles.get(partition).addAll(deletedFiles); + } + + if (!pm.getAppendFiles().isEmpty()) { + if (!partitionToAppendedFiles.containsKey(partition)) { + partitionToAppendedFiles.put(partition, new HashMap<>()); + } + + // Extract appended file name from the absolute paths saved in getAppendFiles() + pm.getAppendFiles().forEach((path, size) -> { + partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, (oldSize, newSizeCopy) -> { + return size + oldSize; + }); + }); + } + }); + } + + private static List convertFilesToRecords(Map> partitionToDeletedFiles, + Map> partitionToAppendedFiles, String instantTime, + String operation) { + List records = new LinkedList<>(); + int[] fileChangeCount = {0, 0}; // deletes, appends + + partitionToDeletedFiles.forEach((partition, deletedFiles) -> { + + fileChangeCount[0] += deletedFiles.size(); + + Option> filesAdded = Option.empty(); + if (partitionToAppendedFiles.containsKey(partition)) { + filesAdded = Option.of(partitionToAppendedFiles.remove(partition)); + } + + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded, + Option.of(new ArrayList<>(deletedFiles))); + records.add(record); + }); + + partitionToAppendedFiles.forEach((partition, appendedFileMap) -> { + fileChangeCount[1] += appendedFileMap.size(); + + // Validate that no appended file has been deleted + ValidationUtils.checkState( + !appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition, Collections.emptyList())), + "Rollback file cannot both be appended and deleted"); + + // New files added to a partition + HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.of(appendedFileMap), + Option.empty()); + records.add(record); + }); + + LOG.info("Found at " + instantTime + " from " + operation + ". #partitions_updated=" + records.size() + + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + fileChangeCount[1]); + + return records; + } + +}