Skip to content

Commit

Permalink
[HUDI-242] Support for RFC-12/Bootstrapping of external datasets to h…
Browse files Browse the repository at this point in the history
…udi (apache#1876)

- [HUDI-418] Bootstrap Index Implementation using HFile with unit-test
 - [HUDI-421] FileSystem View Changes to support Bootstrap with unit-tests
 - [HUDI-424] Implement Query Side Integration for querying tables containing bootstrap file slices
 - [HUDI-423] Implement upsert functionality for handling updates to these bootstrap file slices
 - [HUDI-421] Bootstrap Write Client with tests
 - [HUDI-425] Added HoodieDeltaStreamer support
 - [HUDI-899] Add a knob to change partition-path style while performing metadata bootstrap
 - [HUDI-900] Metadata Bootstrap Key Generator needs to handle complex keys correctly
 - [HUDI-424] Simplify Record reader implementation
 - [HUDI-423] Implement upsert functionality for handling updates to these bootstrap file slices
 - [HUDI-420] Hoodie Demo working with hive and sparkSQL. Also, Hoodie CLI working with bootstrap tables

Co-authored-by: Mehrotra <[email protected]>
Co-authored-by: Vinoth Chandar <[email protected]>
Co-authored-by: Balaji Varadarajan <[email protected]>
  • Loading branch information
4 people authored Aug 4, 2020
1 parent 266bce1 commit 539621b
Show file tree
Hide file tree
Showing 175 changed files with 7,535 additions and 774 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@
This product includes code from Apache Hive.

* org.apache.hadoop.hive.ql.io.CombineHiveInputFormat copied to org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat
* org.apache.hadoop.hive.serde2.ColumnProjectionUtils copied and modified to org.apache.hudi.hadoop.HoodieColumnProjectionUtils

Copyright: 2011-2019 The Apache Software Foundation
Home page: http://hive.apache.org/
Expand Down
5 changes: 4 additions & 1 deletion docker/demo/compaction.commands
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ connect --path /user/hive/warehouse/stock_ticks_mor
compactions show all
compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1
compaction run --parallelism 2 --sparkMemory 1G --schemaFilePath /var/demo/config/schema.avsc --retry 1

connect --path /user/hive/warehouse/stock_ticks_mor_bs
compactions show all
compaction schedule --hoodieConfigs hoodie.compact.inline.max.delta.commits=1
compaction run --parallelism 2 --sparkMemory 1G --schemaFilePath /var/demo/config/schema.avsc --retry 1
8 changes: 8 additions & 0 deletions docker/demo/hive-batch1.commands
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,12 @@ select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GO
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG';

select symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG';

!quit
6 changes: 6 additions & 0 deletions docker/demo/hive-batch2-after-compaction.commands
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = '
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG';

select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG';
select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG';

select symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG';
select symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG';

!quit
6 changes: 6 additions & 0 deletions docker/demo/hive-incremental-cow.commands
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,11 @@ set hoodie.stock_ticks_cow.consume.start.timestamp='${min.commit.time}';

select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';

set hoodie.stock_ticks_cow_bs.consume.mode=INCREMENTAL;
set hoodie.stock_ticks_cow_bs.consume.max.commits=3;
set hoodie.stock_ticks_cow_bs.consume.start.timestamp='00000000000001';

select symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG' and `_hoodie_commit_time` > '00000000000001';

!quit

6 changes: 6 additions & 0 deletions docker/demo/hive-incremental-mor-ro.commands
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,11 @@ set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}';

select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';

set hoodie.stock_ticks_mor_bs.consume.mode=INCREMENTAL;
set hoodie.stock_ticks_mor_bs.consume.max.commits=3;
set hoodie.stock_ticks_mor_bs.consume.start.timestamp='00000000000001';

select symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '00000000000001';

!quit

6 changes: 6 additions & 0 deletions docker/demo/hive-incremental-mor-rt.commands
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,11 @@ set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}';

select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';

set hoodie.stock_ticks_mor_bs.consume.mode=INCREMENTAL;
set hoodie.stock_ticks_mor_bs.consume.max.commits=3;
set hoodie.stock_ticks_mor_bs.consume.start.timestamp='00000000000001';

select symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG' and `_hoodie_commit_time` > '00000000000001';

!quit

10 changes: 10 additions & 0 deletions docker/demo/sparksql-batch1.commands
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,14 @@ spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)

// Bootstrapped Copy-On-Write table
spark.sql("select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG'").show(100, false)

// Bootstrapped Merge-On-Read table
spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG'").show(100, false)

System.exit(0)
10 changes: 10 additions & 0 deletions docker/demo/sparksql-batch2.commands
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,14 @@ spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from s
spark.sql("select symbol, max(ts) from stock_ticks_mor_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG'").show(100, false)

// Copy-On-Write Bootstrapped table
spark.sql("select symbol, max(ts) from stock_ticks_cow_bs group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs where symbol = 'GOOG'").show(100, false)

// Merge-On-Read table Bootstrapped Table
spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_ro group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_ro where symbol = 'GOOG'").show(100, false)
spark.sql("select symbol, max(ts) from stock_ticks_mor_bs_rt group by symbol HAVING symbol = 'GOOG'").show(100, false)
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_bs_rt where symbol = 'GOOG'").show(100, false)

System.exit(0)
22 changes: 22 additions & 0 deletions docker/demo/sparksql-bootstrap-prep-source.commands
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.spark.sql.functions.col

val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
System.exit(0)
34 changes: 32 additions & 2 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,38 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");

spark.sql("show tables").show(20, false)
spark.sql("select count(*) from stock_ticks_derived_mor_ro").show(20, false)
spark.sql("select count(*) from stock_ticks_derived_mor_rt").show(20, false)

System.exit(0);
val hoodieIncQueryBsDF = spark.read.format("org.apache.hudi").
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "00000000000001").
load("/user/hive/warehouse/stock_ticks_cow_bs");
hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow_bs_incr where symbol = 'GOOG'").show(100, false);

spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, close from stock_ticks_cow_bs_incr").
write.format("org.apache.hudi").
option("hoodie.insert.shuffle.parallelism", "2").
option("hoodie.upsert.shuffle.parallelism","2").
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).
option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).
option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key").
option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "datestr").
option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts").
option(HoodieWriteConfig.TABLE_NAME, "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "stock_ticks_derived_mor_bs").
option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default").
option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://hiveserver:10000").
option(DataSourceWriteOptions.HIVE_USER_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");

spark.sql("show tables").show(20, false)
spark.sql("select count(*) from stock_ticks_derived_mor_bs_ro").show(20, false)
spark.sql("select count(*) from stock_ticks_derived_mor_bs_rt").show(20, false)

System.exit(0);
5 changes: 4 additions & 1 deletion hudi-cli/hudi-cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ if [ -z "$CLIENT_JAR" ]; then
echo "Client jar location not set, please set it in conf/hudi-env.sh"
fi

java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:$DIR/target/lib/*:$HOODIE_JAR:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
OTHER_JARS=`ls ${DIR}/target/lib/* | grep -v 'hudi-[^/]*jar' | tr '\n' ':'`

echo "Running : java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:${HOODIE_JAR}:${OTHER_JARS}:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@"
java -cp ${HADOOP_CONF_DIR}:${SPARK_CONF_DIR}:${HOODIE_JAR}:${OTHER_JARS}:${CLIENT_JAR} -DSPARK_CONF_DIR=${SPARK_CONF_DIR} -DHADOOP_CONF_DIR=${HADOOP_CONF_DIR} org.springframework.shell.Bootstrap $@
26 changes: 6 additions & 20 deletions hudi-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,6 @@
</dependency>

<!-- Hoodie -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-hive-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-common</artifactId>
Expand Down Expand Up @@ -198,6 +178,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-utilities-bundle_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Logging -->
<dependency>
<groupId>log4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public String scheduleCompact(@CliOption(key = "sparkMemory", unspecifiedDefault
if (exitCode != 0) {
return "Failed to run compaction for " + compactionInstantTime;
}
return "Compaction successfully completed for " + compactionInstantTime;
return "Attempted to schedule compaction for " + compactionInstantTime;
}

@CliCommand(value = "compaction run", help = "Run Compaction for given instant time")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand Down Expand Up @@ -239,7 +240,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
new HoodieTableMetaClient(client.getHadoopConf(), client.getBasePath(), true);
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", client.getBasePath(), globRegex);
FileStatus[] statuses = fs.globStatus(new Path(globPath));
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath));
Stream<HoodieInstant> instantsStream;

HoodieTimeline timeline;
Expand Down Expand Up @@ -269,6 +270,6 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m

HoodieTimeline filteredTimeline = new HoodieDefaultTimeline(instantsStream,
(Function<HoodieInstant, Option<byte[]>> & Serializable) metaClient.getActiveTimeline()::getInstantDetails);
return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses);
return new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new FileStatus[0]));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -53,7 +54,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -82,7 +82,7 @@ public String showLogFileCommits(
throws IOException {

FileSystem fs = HoodieCLI.getTableMetaClient().getFs();
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
List<String> logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).stream()
.map(status -> status.getPath().toString()).collect(Collectors.toList());
Map<String, List<Tuple3<HoodieLogBlockType, Tuple2<Map<HeaderMetadataType, String>, Map<HeaderMetadataType, String>>, Integer>>> commitCountAndMetadata =
new HashMap<>();
Expand Down Expand Up @@ -175,7 +175,7 @@ public String showLogFileRecords(

HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
FileSystem fs = client.getFs();
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(logFilePathPattern)))
List<String> logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(logFilePathPattern)).stream()
.map(status -> status.getPath().toString()).sorted(Comparator.reverseOrder())
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public String fileSizeStats(

FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", HoodieCLI.getTableMetaClient().getBasePath(), globRegex);
FileStatus[] statuses = fs.globStatus(new Path(globPath));
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath));

// max, min, #small files < 10MB, 50th, avg, 95th
Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public String createTable(
@CliOption(key = {"archiveLogFolder"}, help = "Folder Name for storing archived timeline") String archiveFolder,
@CliOption(key = {"layoutVersion"}, help = "Specific Layout Version to use") Integer layoutVersion,
@CliOption(key = {"payloadClass"}, unspecifiedDefaultValue = "org.apache.hudi.common.model.HoodieAvroPayload",
help = "Payload Class") final String payloadClass)
throws IOException {
help = "Payload Class") final String payloadClass) throws IOException {

boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void init() throws IOException {
// Create table and connect
String tableName = "test_table";
tablePath = basePath + File.separator + tableName;

new TableCommand().createTable(
tablePath, tableName,
"COPY_ON_WRITE", "", 1, "org.apache.hudi.common.model.HoodieAvroPayload");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,35 @@ public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieReco
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}

/**
* Main API to run bootstrap to hudi.
*/
public void bootstrap(Option<Map<String, String>> extraMetadata) {
if (rollbackPending) {
rollBackInflightBootstrap();
}
HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT);
table.bootstrap(jsc, extraMetadata);
}

/**
* Main API to rollback pending bootstrap.
*/
protected void rollBackInflightBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Option<String> instant = Option.fromJavaOptional(
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
if (instant.isPresent() && HoodieTimeline.compareTimestamps(instant.get(), HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
LOG.info("Found pending bootstrap instants. Rolling them back");
table.rollbackBootstrap(jsc, HoodieActiveTimeline.createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}

}

/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
Expand Down Expand Up @@ -671,7 +700,13 @@ private void rollbackPendingCommits() {
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
for (String commit : commits) {
rollback(commit);
if (HoodieTimeline.compareTimestamps(commit, HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
rollBackInflightBootstrap();
break;
} else {
rollback(commit);
}
}
}

Expand Down
Loading

0 comments on commit 539621b

Please sign in to comment.