diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index 4df948e4131f9..e44ba53a39c88 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -28,6 +28,7 @@ import org.apache.hudi.utilities.sources.AvroSource; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JsonSource; +import org.apache.hudi.utilities.sources.ParquetSource; import org.apache.hudi.utilities.sources.RowSource; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.helpers.AvroConvertor; @@ -59,6 +60,8 @@ public InputBatch> fetchNewDataInAvroFormat(Option> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit); AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema()); @@ -99,6 +102,18 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } + case PARQUET: { + InputBatch> r = ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit); + Schema sourceSchema = r.getSchemaProvider().getSourceSchema(); + return new InputBatch<>( + Option + .ofNullable( + r.getBatch() + .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(), + source.getSparkSession())) + .orElse(null)), + r.getCheckpointForNextBatch(), r.getSchemaProvider()); + } case JSON: { InputBatch> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit); Schema sourceSchema = r.getSchemaProvider().getSourceSchema(); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java new file mode 100644 index 0000000000000..22ac3f99c7dc0 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; +import org.apache.parquet.avro.AvroParquetInputFormat; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +/** + * DFS Source that reads parquet data + */ +public class ParquetDFSSource extends ParquetSource { + + private final DFSPathSelector pathSelector; + + public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider); + this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration()); + } + + @Override + protected InputBatch> fetchNewData(Option lastCkptStr, long sourceLimit) { + Pair, String> selectPathsWithMaxModificationTime = + pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit); + return selectPathsWithMaxModificationTime.getLeft() + .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight())) + .orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight())); + } + + private JavaRDD fromFiles(String pathStr) { + JavaPairRDD avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class, + Void.class, GenericRecord.class, sparkContext.hadoopConfiguration()); + return avroRDD.values(); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java new file mode 100644 index 0000000000000..edcc6883ab350 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +public abstract class ParquetSource extends Source> { + + public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, + SchemaProvider schemaProvider) { + super(props, sparkContext, sparkSession, schemaProvider, SourceType.PARQUET); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java index ea57f4b4771eb..0ed1e6c838317 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java @@ -34,7 +34,7 @@ public abstract class Source implements Serializable { protected static volatile Logger log = LogManager.getLogger(Source.class); public enum SourceType { - JSON, AVRO, ROW + JSON, AVRO, ROW, PARQUET } protected transient TypedProperties props; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java index 2125483b8cbe7..46b0dab9000c7 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java @@ -23,23 +23,31 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; +import java.util.ArrayList; import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.server.HiveServer2; +import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.TestRawTripPayload; import org.apache.hudi.common.minicluster.HdfsTestService; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.hive.util.HiveTestService; import org.apache.hudi.utilities.sources.TestDataSource; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; @@ -178,6 +186,15 @@ public static void saveStringsToDFS(String[] lines, FileSystem fs, String target os.close(); } + public static void saveParquetToDFS(List records, Path targetFile) throws IOException { + try (ParquetWriter writer = AvroParquetWriter.builder(targetFile) + .withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) { + for (GenericRecord record : records) { + writer.write(record); + } + } + } + public static TypedProperties setupSchemaOnDFS() throws IOException { UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc"); TypedProperties props = new TypedProperties(); @@ -185,6 +202,24 @@ public static TypedProperties setupSchemaOnDFS() throws IOException { return props; } + public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) { + try { + Option recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema); + return (GenericRecord) recordOpt.get(); + } catch (IOException e) { + return null; + } + } + + public static List toGenericRecords(List hoodieRecords, + HoodieTestDataGenerator dataGenerator) { + List records = new ArrayList(); + for (HoodieRecord hoodieRecord : hoodieRecords) { + records.add(toGenericRecord(hoodieRecord, dataGenerator)); + } + return records; + } + public static String toJsonString(HoodieRecord hr) { try { return ((TestRawTripPayload) hr.getData()).getJsonData(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java index 9ee32855a5470..4d4fafbc9c566 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java @@ -19,10 +19,15 @@ package org.apache.hudi.utilities.sources; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.List; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; @@ -41,7 +46,7 @@ import org.junit.Test; /** - * Basic tests against all subclasses of {@link JsonDFSSource} + * Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource} */ public class TestDFSSource extends UtilitiesTestBase { @@ -82,11 +87,17 @@ public void testJsonDFSSource() throws IOException { assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs, dfsBasePath + "/jsonFiles/1.json"); - assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch()); - InputBatch> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000); + // Test respecting sourceLimit + int sourceLimit = 10; + RemoteIterator files = dfs.listFiles(new Path(dfsBasePath + "/jsonFiles/1.json"), true); + FileStatus file1Status = files.next(); + assertTrue(file1Status.getLen() > sourceLimit); + assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); + // Test json -> Avro + InputBatch> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); assertEquals(100, fetch1.getBatch().get().count()); // Test json -> Row format - InputBatch> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000); + InputBatch> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); assertEquals(100, fetch1AsRows.getBatch().get().count()); // Test Avro -> Row format Dataset fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), @@ -113,5 +124,69 @@ public void testJsonDFSSource() throws IOException { InputBatch> fetch4 = jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); assertEquals(Option.empty(), fetch4.getBatch()); + + // 5. Extract from the beginning + InputBatch> fetch5 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(10100, fetch5.getBatch().get().count()); + } + + @Test + public void testParquetDFSSource() throws IOException { + dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles")); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles"); + ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); + SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + List batch1 = Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100), dataGenerator); + Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet"); + Helpers.saveParquetToDFS(batch1, file1); + // Test respecting sourceLimit + int sourceLimit = 10; + RemoteIterator files = dfs.listFiles(file1, true); + FileStatus file1Status = files.next(); + assertTrue(file1Status.getLen() > sourceLimit); + assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); + // Test parquet -> Avro + InputBatch> fetch1 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1.getBatch().get().count()); + // Test parquet -> Row + InputBatch> fetch1AsRows = parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1AsRows.getBatch().get().count()); + + // 2. Produce new data, extract new data + List batch2 = Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000), dataGenerator); + Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet"); + Helpers.saveParquetToDFS(batch2, file2); + // Test parquet -> Avro + InputBatch> fetch2 = + parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2.getBatch().get().count()); + // Test parquet -> Row + InputBatch> fetch2AsRows = + parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2AsRows.getBatch().get().count()); + + // 3. Extract with previous checkpoint => gives same data back (idempotent) + InputBatch> fetch3AsRows = + parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch3AsRows.getBatch().get().count()); + assertEquals(fetch2AsRows.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch()); + fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table"); + Dataset rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table"); + assertEquals(10000, rowDataset.count()); + + // 4. Extract with latest checkpoint => no new data returned + InputBatch> fetch4 = + parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(Option.empty(), fetch4.getBatch()); + + // 5. Extract from the beginning + InputBatch> fetch5 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(10100, fetch5.getBatch().get().count()); } }