Skip to content

Commit

Permalink
[HUDI-1058] Make delete marker configurable (apache#1819)
Browse files Browse the repository at this point in the history
  • Loading branch information
shenh062326 authored Aug 3, 2020
1 parent 8aa9142 commit 433d7d2
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
.toString();
public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field";
public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted";


public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
Expand Down Expand Up @@ -266,6 +269,10 @@ public BulkInsertSortMode getBulkInsertSortMode() {
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}

public String getDeleteMarkerField() {
return props.getProperty(DELETE_MARKER_FIELD_PROP);
}

/**
* compaction properties.
*/
Expand Down Expand Up @@ -900,6 +907,8 @@ public HoodieWriteConfig build() {
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
setDefaultOnCondition(props, !props.containsKey(DELETE_MARKER_FIELD_PROP),
DELETE_MARKER_FIELD_PROP, DEFAULT_DELETE_MARKER_FIELD);

// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {

private String deleteMarkerField = "_hoodie_is_deleted";

/**
*
*/
Expand All @@ -47,6 +49,12 @@ public OverwriteWithLatestAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order
}

public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal,
String deleteMarkerField) {
this(record, orderingVal);
this.deleteMarkerField = deleteMarkerField;
}

@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) {
// pick the payload with greatest ordering value
Expand Down Expand Up @@ -80,7 +88,7 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
* @returns {@code true} if record represents a delete record. {@code false} otherwise.
*/
private boolean isDeleteRecord(GenericRecord genericRecord) {
Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
Object deleteMarker = genericRecord.get(deleteMarkerField);
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,65 @@
public class TestOverwriteWithLatestAvroPayload {

private Schema schema;
String defaultDeleteMarkerField = "_hoodie_is_deleted";
String deleteMarkerField = "delete_marker_field";

@BeforeEach
public void setUp() throws Exception {
schema = Schema.createRecord(Arrays.asList(
new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null),
new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN), "", false)
new Schema.Field(defaultDeleteMarkerField, Schema.create(Type.BOOLEAN), "", false),
new Schema.Field(deleteMarkerField, Schema.create(Type.BOOLEAN), "", false)
));
}

@Test
public void testActiveRecords() throws IOException {
public void testOverwriteWithLatestAvroPayload() throws IOException {
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition0");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);
record1.put(defaultDeleteMarkerField, false);
record1.put(deleteMarkerField, false);

// test1: set default marker field value to true and user defined to false
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "2");
record2.put("partition", "partition1");
record2.put("ts", 1L);
record2.put("_hoodie_is_deleted", false);
record2.put(defaultDeleteMarkerField, true);
record2.put(deleteMarkerField, false);

// set to user defined marker field with false, the record should be considered active.
assertActiveRecord(record1, record2, deleteMarkerField);

// set to default marker field with true, the record should be considered delete.
assertDeletedRecord(record1, record2, defaultDeleteMarkerField);

// test2: set default marker field value to false and user defined to true
GenericRecord record3 = new GenericData.Record(schema);
record3.put("id", "2");
record3.put("partition", "partition1");
record3.put("ts", 1L);
record3.put(defaultDeleteMarkerField, false);
record3.put(deleteMarkerField, true);

// set to user defined marker field with true, the record should be considered delete.
assertDeletedRecord(record1, record3, deleteMarkerField);

// set to default marker field with false, the record should be considered active.
assertActiveRecord(record1, record3, defaultDeleteMarkerField);
}

private void assertActiveRecord(GenericRecord record1,
GenericRecord record2, String field) throws IOException {
OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(
record1, 1, field);
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(
record2, 2, field);

OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1);
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(record2, 2);
assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);

Expand All @@ -74,22 +106,12 @@ public void testActiveRecords() throws IOException {
assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record2);
}

@Test
public void testDeletedRecord() throws IOException {
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition0");
record1.put("ts", 0L);
record1.put("_hoodie_is_deleted", false);

GenericRecord delRecord1 = new GenericData.Record(schema);
delRecord1.put("id", "2");
delRecord1.put("partition", "partition1");
delRecord1.put("ts", 1L);
delRecord1.put("_hoodie_is_deleted", true);

OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1);
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(delRecord1, 2);
private void assertDeletedRecord(GenericRecord record1,
GenericRecord delRecord1, String field) throws IOException {
OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(
record1, 1, field);
OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(
delRecord1, 2, field);
assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);

Expand All @@ -99,5 +121,4 @@ public void testDeletedRecord() throws IOException {
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record1);
assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},"
+ "{\"name\": \"weight\", \"type\": \"float\"},"
+ "{\"name\": \"nation\", \"type\": \"bytes\"},"
+ "{\"name\": \"user_defined_delete_marker_field\", \"type\": \"boolean\", \"default\": false},"
+ "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}},"
+ "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}},"
+ "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
Expand All @@ -122,7 +123,7 @@ public class HoodieTestDataGenerator {
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";

public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,boolean,int,bigint,decimal(10,6),"
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";

public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
Expand Down Expand Up @@ -177,6 +178,18 @@ public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, Hoodi
return null;
}

public static List<GenericRecord> generateGenericRecords(int n, boolean isDeleteRecord, int instantTime) {
return IntStream.range(0, n).boxed().map(i -> {
String partitionPath = DEFAULT_FIRST_PARTITION_PATH;
HoodieKey key = new HoodieKey("id_" + i, partitionPath);
HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition();
kp.key = key;
kp.partitionPath = partitionPath;
return HoodieTestDataGenerator.generateGenericRecord(
key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, instantTime, isDeleteRecord, false);
}).collect(Collectors.toList());
}

/**
* Generates a new avro record of the above nested schema format,
* retaining the key if optionally provided.
Expand Down Expand Up @@ -263,11 +276,11 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam
rec.put("weight", RAND.nextFloat());
byte[] bytes = "Canada".getBytes();
rec.put("nation", ByteBuffer.wrap(bytes));
rec.put("user_defined_delete_marker_field", isDeleteRecord);
long currentTimeMillis = System.currentTimeMillis();
Date date = new Date(currentTimeMillis);
rec.put("current_date", (int) date.toLocalDate().toEpochDay());
rec.put("current_ts", currentTimeMillis);

BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat()));
Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
Expand All @@ -290,11 +303,7 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam
rec.put("tip_history", tipHistoryArray);
}

if (isDeleteRecord) {
rec.put("_hoodie_is_deleted", true);
} else {
rec.put("_hoodie_is_deleted", false);
}
rec.put("_hoodie_is_deleted", isDeleteRecord);
return rec;
}

Expand Down Expand Up @@ -761,8 +770,8 @@ public int getNumExistingKeys(String schemaStr) {

public static class KeyPartition implements Serializable {

HoodieKey key;
String partitionPath;
public HoodieKey key;
public String partitionPath;
}

public void close() {
Expand Down
23 changes: 17 additions & 6 deletions hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -215,11 +216,20 @@ private static Option<BulkInsertPartitioner> createUserDefinedBulkInsertPartitio
/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
throws IOException {
public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record,
Comparable orderingVal,
String deleteMarkerField) throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
HoodieRecordPayload payload = null;
if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())) {
payload = (OverwriteWithLatestAvroPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class, String.class},
record, orderingVal, deleteMarkerField);
} else {
payload = (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
}
return payload;
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
Expand Down Expand Up @@ -275,8 +285,9 @@ public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, J
}

public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
String payloadClass) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
String payloadClass,
String deleteMarkerField) throws IOException {
HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal, deleteMarkerField);
return new HoodieRecord<>(hKey, payload);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ object DataSourceWriteOptions {
val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName

/**
* Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same
* key value, we will check if the new record is deleted by the delete field.
*/
val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field"
val DEFAULT_DELETE_FIELD_OPT_VAL = "_hoodie_is_deleted"

/**
* Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value
* will be obtained by invoking .toString() on the field value. Nested fields can be specified using
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ private[hudi] object HoodieSparkSqlWriter {
val orderingVal = DataSourceUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY),
parameters(DELETE_FIELD_OPT_KEY))
}).toJavaRDD()

// Handle various save modes
Expand Down Expand Up @@ -202,6 +204,7 @@ private[hudi] object HoodieSparkSqlWriter {
TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
DELETE_FIELD_OPT_KEY -> DEFAULT_DELETE_FIELD_OPT_VAL,
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,52 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
}
}

test("test OverwriteWithLatestAvroPayload with user defined delete field") {
val session = SparkSession.builder()
.appName("test_append_mode")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")

try {
val sqlContext = session.sqlContext
val hoodieFooTableName = "hoodie_foo_tbl"

val keyField = "id"
val deleteMarkerField = "delete_field"

//create a new table
val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
"hoodie.insert.shuffle.parallelism" -> "2",
"hoodie.upsert.shuffle.parallelism" -> "2",
DELETE_FIELD_OPT_KEY -> deleteMarkerField,
RECORDKEY_FIELD_OPT_KEY -> keyField)
val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)

val id1 = UUID.randomUUID().toString
val dataFrame = session.createDataFrame(Seq(
(id1, 1, false)
)) toDF(keyField, "ts", deleteMarkerField)

HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame)
val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count
assert(recordCount1 == 1, "result should be 1, but get " + recordCount1)

val dataFrame2 = session.createDataFrame(Seq(
(id1, 2, true)
)) toDF(keyField, "ts", deleteMarkerField)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2)

val recordCount2 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count()
assert(recordCount2 == 0, "result should be 0, but get " + recordCount2)
} finally {
session.stop()
FileUtils.deleteDirectory(path.toFile)
}
}

case class Test(uuid: String, ts: Long)

}
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,12 @@ public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> readFromSource(
}

JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
String deleteMakrerField = props.getString(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP,
HoodieWriteConfig.DEFAULT_DELETE_MARKER_FIELD);
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false));
(Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false),
deleteMakrerField);
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});

Expand Down
Loading

0 comments on commit 433d7d2

Please sign in to comment.