Skip to content

Commit

Permalink
[HUDI-8076] Timeline - Support 0.x and 1.x implementation for timelin…
Browse files Browse the repository at this point in the history
…e and related classes (apache#11923)

[HUDI-8076] Timeline - Support 0.x and 1.x implementation for timeline and related classes
---------
Co-authored-by: Balaji Varadarajan <[email protected]>
Co-authored-by: Balaji Varadarajan <[email protected]>
  • Loading branch information
bvaradar authored Nov 15, 2024
1 parent 7107995 commit df9c4d6
Show file tree
Hide file tree
Showing 517 changed files with 10,771 additions and 5,537 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,7 @@ public void close() {
@Override
public void updateLastCommitTimeSynced(String tableName) {
HoodieTimeline activeTimeline = getActiveTimeline();
Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::requestedTime);
Option<String> lastCommitCompletionSynced = activeTimeline
.getInstantsOrderedByCompletionTime()
.skip(activeTimeline.countInstants() - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.DefaultInstantFileNameGenerator;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
Expand Down Expand Up @@ -112,7 +112,7 @@ public static void createHoodieTable() throws IOException {

String instantTime = "101";
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(false);
createMetaFile(basePath, HoodieTimeline.makeCommitFileName(instantTime), commitMetadata);
createMetaFile(basePath, new DefaultInstantFileNameGenerator().makeCommitFileName(instantTime), commitMetadata);
}

public static MessageType getSimpleSchema() {
Expand Down
13 changes: 2 additions & 11 deletions hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
Expand All @@ -48,7 +46,6 @@ public class HoodieCLI {
public static String basePath;
protected static HoodieTableMetaClient tableMetadata;
public static HoodieTableMetaClient syncTableMetadata;
public static TimelineLayoutVersion layoutVersion;
public static TempViewProvider tempViewProvider;

/**
Expand All @@ -74,11 +71,6 @@ private static void setBasePath(String basePath) {
HoodieCLI.basePath = basePath;
}

private static void setLayoutVersion(Integer layoutVersion) {
HoodieCLI.layoutVersion = new TimelineLayoutVersion(
(layoutVersion == null) ? TimelineLayoutVersion.CURR_VERSION : layoutVersion);
}

public static boolean initConf() {
if (HoodieCLI.conf == null) {
HoodieCLI.conf = HadoopFSUtils.getStorageConf(
Expand All @@ -101,12 +93,11 @@ public static void refreshTableMetadata() {
.setConf(HoodieCLI.conf.newInstance()).setBasePath(basePath).setLoadActiveTimelineOnLoad(false)
.setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig)
.setTimeGeneratorConfig(timeGeneratorConfig == null ? HoodieTimeGeneratorConfig.defaultConfig(basePath) : timeGeneratorConfig)
.setLayoutVersion(Option.of(layoutVersion)).build());
.build());
}

public static void connectTo(String basePath, Integer layoutVersion) {
public static void connectTo(String basePath) {
setBasePath(basePath);
setLayoutVersion(layoutVersion);
refreshTableMetadata();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;

import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.utilities.UtilHelpers;

Expand Down Expand Up @@ -72,14 +72,14 @@ public String showCleans(
defaultValue = "false") final boolean headerOnly)
throws IOException {

HoodieDefaultTimeline activeTimeline = CLIUtils.getTimelineInRange(startTs, endTs, includeArchivedTimeline);
HoodieTimeline activeTimeline = CLIUtils.getTimelineInRange(startTs, endTs, includeArchivedTimeline);
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
List<HoodieInstant> cleans = timeline.getReverseOrderedInstants().collect(Collectors.toList());
List<Comparable[]> rows = new ArrayList<>();
for (HoodieInstant clean : cleans) {
HoodieCleanMetadata cleanMetadata =
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
rows.add(new Comparable[] {clean.getTimestamp(), cleanMetadata.getEarliestCommitToRetain(),
rows.add(new Comparable[] {clean.requestedTime(), cleanMetadata.getEarliestCommitToRetain(),
cleanMetadata.getTotalFilesDeleted(), cleanMetadata.getTimeTakenInMillis()});
}

Expand All @@ -103,7 +103,7 @@ public String showCleanPartitions(

HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline = activeTimeline.getCleanerTimeline().filterCompletedInstants();
HoodieInstant cleanInstant = new HoodieInstant(false, HoodieTimeline.CLEAN_ACTION, instantTime);
HoodieInstant cleanInstant = HoodieCLI.getTableMetaClient().createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION, instantTime);

if (!timeline.containsInstant(cleanInstant)) {
return "Clean " + instantTime + " not found in metadata " + timeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparator;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
Expand All @@ -48,6 +48,8 @@
import java.util.stream.Collectors;

import static org.apache.hudi.cli.utils.CommitUtil.getTimeDaysAgo;
import static org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.table.timeline.TimelineUtils.getTimeline;

/**
Expand All @@ -56,23 +58,25 @@
@ShellComponent
public class CommitsCommand {

private String printCommits(HoodieDefaultTimeline timeline,
private String printCommits(HoodieTimeline timeline,
final Integer limit,
final String sortByField,
final boolean descending,
final boolean headerOnly,
final String tempTableName) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();
InstantComparator instantComparator = HoodieCLI.getTableMetaClient().getTimelineLayout().getInstantComparator();

final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstantsAsStream().sorted(HoodieInstant.INSTANT_TIME_COMPARATOR.reversed()).collect(Collectors.toList());
.getInstantsAsStream().sorted(instantComparator.requestedTimeOrderedComparator().reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
if (timeline.getInstantDetails(commit).isPresent()) {
final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
final HoodieCommitMetadata commitMetadata = HoodieCLI.getTableMetaClient().getCommitMetadataSerDe().deserialize(
commit,
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);
rows.add(new Comparable[] {commit.getTimestamp(),
rows.add(new Comparable[] {commit.requestedTime(),
commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalFilesInsert(),
commitMetadata.fetchTotalFilesUpdated(),
Expand All @@ -94,28 +98,30 @@ private String printCommits(HoodieDefaultTimeline timeline,
limit, headerOnly, rows, tempTableName);
}

private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
private String printCommitsWithMetadata(HoodieTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
final boolean headerOnly,
final String tempTableName,
final String partition) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();
InstantComparator instantComparator = HoodieCLI.getTableMetaClient().getTimelineLayout().getInstantComparator();

final List<HoodieInstant> commits = timeline.getCommitsTimeline().filterCompletedInstants()
.getInstantsAsStream().sorted(HoodieInstant.INSTANT_TIME_COMPARATOR.reversed()).collect(Collectors.toList());
.getInstantsAsStream().sorted(instantComparator.requestedTimeOrderedComparator().reversed()).collect(Collectors.toList());

for (final HoodieInstant commit : commits) {
if (timeline.getInstantDetails(commit).isPresent()) {
final HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
final HoodieCommitMetadata commitMetadata = HoodieCLI.getTableMetaClient().getCommitMetadataSerDe().deserialize(
commit,
timeline.getInstantDetails(commit).get(),
HoodieCommitMetadata.class);

for (Map.Entry<String, List<HoodieWriteStat>> partitionWriteStat :
commitMetadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat hoodieWriteStat : partitionWriteStat.getValue()) {
if (StringUtils.isNullOrEmpty(partition) || partition.equals(hoodieWriteStat.getPartitionPath())) {
rows.add(new Comparable[] {commit.getAction(), commit.getTimestamp(), hoodieWriteStat.getPartitionPath(),
rows.add(new Comparable[] {commit.getAction(), commit.requestedTime(), hoodieWriteStat.getPartitionPath(),
hoodieWriteStat.getFileId(), hoodieWriteStat.getPrevCommit(), hoodieWriteStat.getNumWrites(),
hoodieWriteStat.getNumInserts(), hoodieWriteStat.getNumDeletes(),
hoodieWriteStat.getNumUpdateWrites(), hoodieWriteStat.getTotalWriteErrors(),
Expand Down Expand Up @@ -155,7 +161,7 @@ public String showCommits(
defaultValue = "false") final boolean includeArchivedTimeline)
throws IOException {

HoodieDefaultTimeline timeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
HoodieTimeline timeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
if (includeExtraMetadata) {
return printCommitsWithMetadata(timeline, limit, sortByField, descending, headerOnly, exportTableName, partition);
} else {
Expand Down Expand Up @@ -188,7 +194,7 @@ public String showArchivedCommits(
HoodieArchivedTimeline archivedTimeline = HoodieCLI.getTableMetaClient().getArchivedTimeline();
try {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
HoodieDefaultTimeline timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs);
HoodieTimeline timelineRange = (HoodieTimeline)archivedTimeline.findInstantsInRange(startTs, endTs);
if (includeExtraMetadata) {
return printCommitsWithMetadata(timelineRange, limit, sortByField, descending, headerOnly, exportTableName, partition);
} else {
Expand All @@ -214,7 +220,7 @@ public String showCommitPartitions(
defaultValue = "false") final boolean includeArchivedTimeline)
throws Exception {

HoodieDefaultTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
HoodieTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
HoodieTimeline timeline = defaultTimeline.getCommitsTimeline().filterCompletedInstants();

Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, instantTime);
Expand Down Expand Up @@ -282,7 +288,7 @@ public String showWriteStats(
defaultValue = "false") final boolean includeArchivedTimeline)
throws Exception {

HoodieDefaultTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
HoodieTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
HoodieTimeline timeline = defaultTimeline.getCommitsTimeline().filterCompletedInstants();

Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, instantTime);
Expand Down Expand Up @@ -328,7 +334,7 @@ public String showCommitFiles(
defaultValue = "false") final boolean includeArchivedTimeline)
throws Exception {

HoodieDefaultTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
HoodieTimeline defaultTimeline = getTimeline(HoodieCLI.getTableMetaClient(), includeArchivedTimeline);
HoodieTimeline timeline = defaultTimeline.getCommitsTimeline().filterCompletedInstants();

Option<HoodieInstant> hoodieInstantOption = getCommitForInstant(timeline, instantTime);
Expand Down Expand Up @@ -373,20 +379,20 @@ public String compareCommits(@ShellOption(value = {"--path"}, help = "Path of th
HoodieTimeline targetTimeline = target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline sourceTimeline = source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String targetLatestCommit =
targetTimeline.getInstantsAsStream().iterator().hasNext() ? targetTimeline.lastInstant().get().getTimestamp() : "0";
targetTimeline.getInstantsAsStream().iterator().hasNext() ? targetTimeline.lastInstant().get().requestedTime() : "0";
String sourceLatestCommit =
sourceTimeline.getInstantsAsStream().iterator().hasNext() ? sourceTimeline.lastInstant().get().getTimestamp() : "0";
sourceTimeline.getInstantsAsStream().iterator().hasNext() ? sourceTimeline.lastInstant().get().requestedTime() : "0";

if (sourceLatestCommit != null
&& HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) {
&& compareTimestamps(targetLatestCommit, GREATER_THAN, sourceLatestCommit)) {
// source is behind the target
List<String> commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE)
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
.getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toList());
return "Source " + source.getTableConfig().getTableName() + " is behind by " + commitsToCatchup.size()
+ " commits. Commits to catch up - " + commitsToCatchup;
} else {
List<String> commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE)
.getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
.getInstantsAsStream().map(HoodieInstant::requestedTime).collect(Collectors.toList());
return "Source " + source.getTableConfig().getTableName() + " is ahead by " + commitsToCatchup.size()
+ " commits. Commits to catch up - " + commitsToCatchup;
}
Expand All @@ -406,9 +412,9 @@ public String syncCommits(@ShellOption(value = {"--path"}, help = "Path of the t
* */
private Option<HoodieInstant> getCommitForInstant(HoodieTimeline timeline, String instantTime) {
List<HoodieInstant> instants = Arrays.asList(
new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime));
HoodieCLI.getTableMetaClient().createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, instantTime),
HoodieCLI.getTableMetaClient().createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime),
HoodieCLI.getTableMetaClient().createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, instantTime));

return Option.fromJavaOptional(instants.stream().filter(timeline::containsInstant).findAny());
}
Expand Down
Loading

0 comments on commit df9c4d6

Please sign in to comment.