diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 69758f251cc4f..61e89e3daf904 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -93,6 +93,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT .toString(); + public static final String DELETE_MARKER_FIELD_PROP = "hoodie.write.delete.marker.field"; + public static final String DEFAULT_DELETE_MARKER_FIELD = "_hoodie_is_deleted"; + public static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; @@ -266,6 +269,10 @@ public BulkInsertSortMode getBulkInsertSortMode() { return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); } + public String getDeleteMarkerField() { + return props.getProperty(DELETE_MARKER_FIELD_PROP); + } + /** * compaction properties. */ @@ -900,6 +907,8 @@ public HoodieWriteConfig build() { setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE); setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE), BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); + setDefaultOnCondition(props, !props.containsKey(DELETE_MARKER_FIELD_PROP), + DELETE_MARKER_FIELD_PROP, DEFAULT_DELETE_MARKER_FIELD); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build()); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java index d8dffdf1e1222..0e4b18ad06884 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java @@ -36,6 +36,8 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements HoodieRecordPayload { + private String deleteMarkerField = "_hoodie_is_deleted"; + /** * */ @@ -47,6 +49,12 @@ public OverwriteWithLatestAvroPayload(Option record) { this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural order } + public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal, + String deleteMarkerField) { + this(record, orderingVal); + this.deleteMarkerField = deleteMarkerField; + } + @Override public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload another) { // pick the payload with greatest ordering value @@ -80,7 +88,7 @@ public Option getInsertValue(Schema schema) throws IOException { * @returns {@code true} if record represents a delete record. {@code false} otherwise. */ private boolean isDeleteRecord(GenericRecord genericRecord) { - Object deleteMarker = genericRecord.get("_hoodie_is_deleted"); + Object deleteMarker = genericRecord.get(deleteMarkerField); return (deleteMarker instanceof Boolean && (boolean) deleteMarker); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java index 7c5951a7cac04..e2123672b73da 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java @@ -37,6 +37,8 @@ public class TestOverwriteWithLatestAvroPayload { private Schema schema; + String defaultDeleteMarkerField = "_hoodie_is_deleted"; + String deleteMarkerField = "delete_marker_field"; @BeforeEach public void setUp() throws Exception { @@ -44,26 +46,56 @@ public void setUp() throws Exception { new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null), new Schema.Field("partition", Schema.create(Schema.Type.STRING), "", null), new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null), - new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN), "", false) + new Schema.Field(defaultDeleteMarkerField, Schema.create(Type.BOOLEAN), "", false), + new Schema.Field(deleteMarkerField, Schema.create(Type.BOOLEAN), "", false) )); } @Test - public void testActiveRecords() throws IOException { + public void testOverwriteWithLatestAvroPayload() throws IOException { GenericRecord record1 = new GenericData.Record(schema); record1.put("id", "1"); record1.put("partition", "partition0"); record1.put("ts", 0L); - record1.put("_hoodie_is_deleted", false); + record1.put(defaultDeleteMarkerField, false); + record1.put(deleteMarkerField, false); + // test1: set default marker field value to true and user defined to false GenericRecord record2 = new GenericData.Record(schema); record2.put("id", "2"); record2.put("partition", "partition1"); record2.put("ts", 1L); - record2.put("_hoodie_is_deleted", false); + record2.put(defaultDeleteMarkerField, true); + record2.put(deleteMarkerField, false); + + // set to user defined marker field with false, the record should be considered active. + assertActiveRecord(record1, record2, deleteMarkerField); + + // set to default marker field with true, the record should be considered delete. + assertDeletedRecord(record1, record2, defaultDeleteMarkerField); + + // test2: set default marker field value to false and user defined to true + GenericRecord record3 = new GenericData.Record(schema); + record3.put("id", "2"); + record3.put("partition", "partition1"); + record3.put("ts", 1L); + record3.put(defaultDeleteMarkerField, false); + record3.put(deleteMarkerField, true); + + // set to user defined marker field with true, the record should be considered delete. + assertDeletedRecord(record1, record3, deleteMarkerField); + + // set to default marker field with false, the record should be considered active. + assertActiveRecord(record1, record3, defaultDeleteMarkerField); + } + + private void assertActiveRecord(GenericRecord record1, + GenericRecord record2, String field) throws IOException { + OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload( + record1, 1, field); + OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload( + record2, 2, field); - OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1); - OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(record2, 2); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -74,22 +106,12 @@ public void testActiveRecords() throws IOException { assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(), record2); } - @Test - public void testDeletedRecord() throws IOException { - GenericRecord record1 = new GenericData.Record(schema); - record1.put("id", "1"); - record1.put("partition", "partition0"); - record1.put("ts", 0L); - record1.put("_hoodie_is_deleted", false); - - GenericRecord delRecord1 = new GenericData.Record(schema); - delRecord1.put("id", "2"); - delRecord1.put("partition", "partition1"); - delRecord1.put("ts", 1L); - delRecord1.put("_hoodie_is_deleted", true); - - OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload(record1, 1); - OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload(delRecord1, 2); + private void assertDeletedRecord(GenericRecord record1, + GenericRecord delRecord1, String field) throws IOException { + OverwriteWithLatestAvroPayload payload1 = new OverwriteWithLatestAvroPayload( + record1, 1, field); + OverwriteWithLatestAvroPayload payload2 = new OverwriteWithLatestAvroPayload( + delRecord1, 2, field); assertEquals(payload1.preCombine(payload2), payload2); assertEquals(payload2.preCombine(payload1), payload2); @@ -99,5 +121,4 @@ public void testDeletedRecord() throws IOException { assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), record1); assertFalse(payload2.combineAndGetUpdateValue(record1, schema).isPresent()); } - } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index ed7a4582e9027..841a74afd38ee 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -105,6 +105,7 @@ public class HoodieTestDataGenerator { + "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"}," + "{\"name\": \"weight\", \"type\": \"float\"}," + "{\"name\": \"nation\", \"type\": \"bytes\"}," + + "{\"name\": \"user_defined_delete_marker_field\", \"type\": \"boolean\", \"default\": false}," + "{\"name\":\"current_date\",\"type\": {\"type\": \"int\", \"logicalType\": \"date\"}}," + "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-micros\"}}," + "{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},"; @@ -122,7 +123,7 @@ public class HoodieTestDataGenerator { + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); - public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6)," + public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,int,bigint,float,binary,boolean,int,bigint,decimal(10,6)," + "map,struct,array>,boolean"; public static final Schema AVRO_SCHEMA = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); @@ -177,6 +178,18 @@ public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, Hoodi return null; } + public static List generateGenericRecords(int n, boolean isDeleteRecord, int instantTime) { + return IntStream.range(0, n).boxed().map(i -> { + String partitionPath = DEFAULT_FIRST_PARTITION_PATH; + HoodieKey key = new HoodieKey("id_" + i, partitionPath); + HoodieTestDataGenerator.KeyPartition kp = new HoodieTestDataGenerator.KeyPartition(); + kp.key = key; + kp.partitionPath = partitionPath; + return HoodieTestDataGenerator.generateGenericRecord( + key.getRecordKey(), "rider-" + instantTime, "driver-" + instantTime, instantTime, isDeleteRecord, false); + }).collect(Collectors.toList()); + } + /** * Generates a new avro record of the above nested schema format, * retaining the key if optionally provided. @@ -263,11 +276,11 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam rec.put("weight", RAND.nextFloat()); byte[] bytes = "Canada".getBytes(); rec.put("nation", ByteBuffer.wrap(bytes)); + rec.put("user_defined_delete_marker_field", isDeleteRecord); long currentTimeMillis = System.currentTimeMillis(); Date date = new Date(currentTimeMillis); rec.put("current_date", (int) date.toLocalDate().toEpochDay()); rec.put("current_ts", currentTimeMillis); - BigDecimal bigDecimal = new BigDecimal(String.format("%5f", RAND.nextFloat())); Schema decimalSchema = AVRO_SCHEMA.getField("height").schema(); Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion(); @@ -290,11 +303,7 @@ public static GenericRecord generateGenericRecord(String rowKey, String riderNam rec.put("tip_history", tipHistoryArray); } - if (isDeleteRecord) { - rec.put("_hoodie_is_deleted", true); - } else { - rec.put("_hoodie_is_deleted", false); - } + rec.put("_hoodie_is_deleted", isDeleteRecord); return rec; } @@ -761,8 +770,8 @@ public int getNumExistingKeys(String schemaStr) { public static class KeyPartition implements Serializable { - HoodieKey key; - String partitionPath; + public HoodieKey key; + public String partitionPath; } public void close() { diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index a4e74720c6441..5740f4b3e2d13 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; @@ -215,11 +216,20 @@ private static Option createUserDefinedBulkInsertPartitio /** * Create a payload class via reflection, passing in an ordering/precombine value. */ - public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal) - throws IOException { + public static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, + Comparable orderingVal, + String deleteMarkerField) throws IOException { try { - return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {GenericRecord.class, Comparable.class}, record, orderingVal); + HoodieRecordPayload payload = null; + if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName())) { + payload = (OverwriteWithLatestAvroPayload) ReflectionUtils.loadClass(payloadClass, + new Class[]{GenericRecord.class, Comparable.class, String.class}, + record, orderingVal, deleteMarkerField); + } else { + payload = (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, + new Class[]{GenericRecord.class, Comparable.class}, record, orderingVal); + } + return payload; } catch (Throwable e) { throw new IOException("Could not create payload for class: " + payloadClass, e); } @@ -275,8 +285,9 @@ public static JavaRDD doDeleteOperation(HoodieWriteClient client, J } public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, - String payloadClass) throws IOException { - HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); + String payloadClass, + String deleteMarkerField) throws IOException { + HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal, deleteMarkerField); return new HoodieRecord<>(hKey, payload); } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 5195f05742730..bb26302b91479 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -184,6 +184,13 @@ object DataSourceWriteOptions { val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class" val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName + /** + * Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue, When two records have the same + * key value, we will check if the new record is deleted by the delete field. + */ + val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field" + val DEFAULT_DELETE_FIELD_OPT_VAL = "_hoodie_is_deleted" + /** * Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value * will be obtained by invoking .toString() on the field value. Nested fields can be specified using diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 3ba34cb2671aa..f8f388e248ba1 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -110,7 +110,9 @@ private[hudi] object HoodieSparkSqlWriter { val orderingVal = DataSourceUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false) .asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(gr, - orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) + orderingVal, keyGenerator.getKey(gr), + parameters(PAYLOAD_CLASS_OPT_KEY), + parameters(DELETE_FIELD_OPT_KEY)) }).toJavaRDD() // Handle various save modes @@ -202,6 +204,7 @@ private[hudi] object HoodieSparkSqlWriter { TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL, PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL, PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL, + DELETE_FIELD_OPT_KEY -> DEFAULT_DELETE_FIELD_OPT_VAL, RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL, PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL, KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL, diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index 7f264814ccda6..29dac2b0f3090 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -100,6 +100,52 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } + test("test OverwriteWithLatestAvroPayload with user defined delete field") { + val session = SparkSession.builder() + .appName("test_append_mode") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1") + + try { + val sqlContext = session.sqlContext + val hoodieFooTableName = "hoodie_foo_tbl" + + val keyField = "id" + val deleteMarkerField = "delete_field" + + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "2", + "hoodie.upsert.shuffle.parallelism" -> "2", + DELETE_FIELD_OPT_KEY -> deleteMarkerField, + RECORDKEY_FIELD_OPT_KEY -> keyField) + val fooTableParams = HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier) + + val id1 = UUID.randomUUID().toString + val dataFrame = session.createDataFrame(Seq( + (id1, 1, false) + )) toDF(keyField, "ts", deleteMarkerField) + + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame) + val recordCount1 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count + assert(recordCount1 == 1, "result should be 1, but get " + recordCount1) + + val dataFrame2 = session.createDataFrame(Seq( + (id1, 2, true) + )) toDF(keyField, "ts", deleteMarkerField) + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, dataFrame2) + + val recordCount2 = sqlContext.read.format("org.apache.hudi").load(path.toString + "/*/*.parquet").count() + assert(recordCount2 == 0, "result should be 0, but get " + recordCount2) + } finally { + session.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + case class Test(uuid: String, ts: Long) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index e7226fbe6caef..0db486e1118ca 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -338,9 +338,12 @@ public Pair>> readFromSource( } JavaRDD avroRDD = avroRDDOptional.get(); + String deleteMakrerField = props.getString(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP, + HoodieWriteConfig.DEFAULT_DELETE_MARKER_FIELD); JavaRDD records = avroRDD.map(gr -> { HoodieRecordPayload payload = DataSourceUtils.createPayload(cfg.payloadClassName, gr, - (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false)); + (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false), + deleteMakrerField); return new HoodieRecord<>(keyGenerator.getKey(gr), payload); }); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java new file mode 100644 index 0000000000000..f98eb793379c5 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.functional; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; +import org.apache.hudi.utilities.sources.ParquetDFSSource; +import org.apache.hudi.utilities.testutils.UtilitiesTestBase; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends UtilitiesTestBase { + private static String PARQUET_SOURCE_ROOT; + private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + + @BeforeAll + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(true); + PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + + // prepare the configs. + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", dfs, dfsBasePath + "/base.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties", dfs, + dfsBasePath + "/sql-transformer.properties"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc", dfs, dfsBasePath + "/source-flattened.avsc"); + UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", dfs, dfsBasePath + "/target.avsc"); + } + + @Test + public void testOverwriteLatestAvroPayload() throws Exception { + // test defaultDeleteMarkerField + this.testOverwriteLatestAvroPayload(null); + + // test userDefinedDeleteMarkerField + this.testOverwriteLatestAvroPayload("user_defined_delete_marker_field"); + } + + private void testOverwriteLatestAvroPayload(String deleteMarkerField) throws Exception { + String path = PARQUET_SOURCE_ROOT + "/1.parquet"; + List records = HoodieTestDataGenerator.generateGenericRecords(5, false, 0); + Helpers.saveParquetToDFS(records, new Path(path)); + + TypedProperties parquetProps = new TypedProperties(); + parquetProps.setProperty("include", "base.properties"); + parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT); + if (deleteMarkerField != null) { + parquetProps.setProperty(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP, deleteMarkerField); + } + Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); + + String tableBasePath = dfsBasePath + "/test_overwrite_lastest_avro_payload_table"; + + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(), + null, PROPS_FILENAME_TEST_PARQUET, false, + false, 100000, false, null, null, "timestamp"), jsc); + deltaStreamer.sync(); + TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath + "/*/*.parquet", sqlContext); + + String path2 = PARQUET_SOURCE_ROOT + "/2.parquet"; + List records2 = HoodieTestDataGenerator.generateGenericRecords(4, true, 1); + Helpers.saveParquetToDFS(records2, new Path(path2)); + deltaStreamer.sync(); + + List rows = sqlContext.read().format("org.apache.hudi").load(tableBasePath + "/*/*.parquet").collectAsList(); + assertEquals(1, rows.size()); + assertEquals(records.get(4).get("_row_key"), rows.get(0).getString(2)); + } +} diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc index f5cc97f56e2e4..bda782e01176b 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc @@ -55,6 +55,10 @@ },{ "name" : "nation", "type" : "bytes" + },{ + "name" : "user_defined_delete_marker_field", + "type" : "boolean", + "default" : false },{ "name" : "current_date", "type" : { diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties index dc735e8032d27..5eaa0a72d28d7 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties +++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties @@ -16,4 +16,4 @@ # limitations under the License. ### include=base.properties -hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a +hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight, a.nation, a.user_defined_delete_marker_field, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM a diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc index a02610793a5da..a17e3ddf1b3db 100644 --- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc +++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc @@ -55,6 +55,10 @@ }, { "name" : "nation", "type" : "bytes" + },{ + "name" : "user_defined_delete_marker_field", + "type" : "boolean", + "default" : false },{ "name" : "current_date", "type" : {