Skip to content

Commit

Permalink
[HUDI-40] Add parquet support for the Delta Streamer (apache#949)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyli1019 authored and vinothchandar committed Oct 17, 2019
1 parent 7381b66 commit ed745df
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.ParquetSource;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
Expand Down Expand Up @@ -59,6 +60,8 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
switch (source.getSourceType()) {
case AVRO:
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
case PARQUET:
return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new AvroConvertor(r.getSchemaProvider().getSourceSchema());
Expand Down Expand Up @@ -99,6 +102,18 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case PARQUET: {
InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
return new InputBatch<>(
Option
.ofNullable(
r.getBatch()
.map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
source.getSparkSession()))
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource) source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
import org.apache.parquet.avro.AvroParquetInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

/**
* DFS Source that reads parquet data
*/
public class ParquetDFSSource extends ParquetSource {

private final DFSPathSelector pathSelector;

public ParquetDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = new DFSPathSelector(props, this.sparkContext.hadoopConfiguration());
}

@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
}

private JavaRDD<GenericRecord> fromFiles(String pathStr) {
JavaPairRDD<Void, GenericRecord> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
return avroRDD.values();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.sources;

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {

public ParquetSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider, SourceType.PARQUET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class Source<T> implements Serializable {
protected static volatile Logger log = LogManager.getLogger(Source.class);

public enum SourceType {
JSON, AVRO, ROW
JSON, AVRO, ROW, PARQUET
}

protected transient TypedProperties props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,31 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.minicluster.HdfsTestService;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.util.HiveTestService;
import org.apache.hudi.utilities.sources.TestDataSource;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -178,13 +186,40 @@ public static void saveStringsToDFS(String[] lines, FileSystem fs, String target
os.close();
}

public static void saveParquetToDFS(List<GenericRecord> records, Path targetFile) throws IOException {
try (ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(targetFile)
.withSchema(HoodieTestDataGenerator.avroSchema).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {
for (GenericRecord record : records) {
writer.write(record);
}
}
}

public static TypedProperties setupSchemaOnDFS() throws IOException {
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", dfs, dfsBasePath + "/source.avsc");
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
return props;
}

public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
try {
Option<IndexedRecord> recordOpt = hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
return (GenericRecord) recordOpt.get();
} catch (IOException e) {
return null;
}
}

public static List<GenericRecord> toGenericRecords(List<HoodieRecord> hoodieRecords,
HoodieTestDataGenerator dataGenerator) {
List<GenericRecord> records = new ArrayList<GenericRecord>();
for (HoodieRecord hoodieRecord : hoodieRecords) {
records.add(toGenericRecord(hoodieRecord, dataGenerator));
}
return records;
}

public static String toJsonString(HoodieRecord hr) {
try {
return ((TestRawTripPayload) hr.getData()).getJsonData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
package org.apache.hudi.utilities.sources;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
Expand All @@ -41,7 +46,7 @@
import org.junit.Test;

/**
* Basic tests against all subclasses of {@link JsonDFSSource}
* Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource}
*/
public class TestDFSSource extends UtilitiesTestBase {

Expand Down Expand Up @@ -82,11 +87,17 @@ public void testJsonDFSSource() throws IOException {
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs,
dfsBasePath + "/jsonFiles/1.json");
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), 10).getBatch());
InputBatch<JavaRDD<GenericRecord>> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), 1000000);
// Test respecting sourceLimit
int sourceLimit = 10;
RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path(dfsBasePath + "/jsonFiles/1.json"), true);
FileStatus file1Status = files.next();
assertTrue(file1Status.getLen() > sourceLimit);
assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
// Test json -> Avro
InputBatch<JavaRDD<GenericRecord>> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1.getBatch().get().count());
// Test json -> Row format
InputBatch<Dataset<Row>> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), 1000000);
InputBatch<Dataset<Row>> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1AsRows.getBatch().get().count());
// Test Avro -> Row format
Dataset<Row> fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()),
Expand All @@ -113,5 +124,69 @@ public void testJsonDFSSource() throws IOException {
InputBatch<JavaRDD<GenericRecord>> fetch4 =
jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());

// 5. Extract from the beginning
InputBatch<JavaRDD<GenericRecord>> fetch5 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());
}

@Test
public void testParquetDFSSource() throws IOException {
dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles"));
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();

TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles");
ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource);

// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
List<GenericRecord> batch1 = Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100), dataGenerator);
Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet");
Helpers.saveParquetToDFS(batch1, file1);
// Test respecting sourceLimit
int sourceLimit = 10;
RemoteIterator<LocatedFileStatus> files = dfs.listFiles(file1, true);
FileStatus file1Status = files.next();
assertTrue(file1Status.getLen() > sourceLimit);
assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch());
// Test parquet -> Avro
InputBatch<JavaRDD<GenericRecord>> fetch1 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1.getBatch().get().count());
// Test parquet -> Row
InputBatch<Dataset<Row>> fetch1AsRows = parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(100, fetch1AsRows.getBatch().get().count());

// 2. Produce new data, extract new data
List<GenericRecord> batch2 = Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000), dataGenerator);
Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet");
Helpers.saveParquetToDFS(batch2, file2);
// Test parquet -> Avro
InputBatch<JavaRDD<GenericRecord>> fetch2 =
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch2.getBatch().get().count());
// Test parquet -> Row
InputBatch<Dataset<Row>> fetch2AsRows =
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch2AsRows.getBatch().get().count());

// 3. Extract with previous checkpoint => gives same data back (idempotent)
InputBatch<Dataset<Row>> fetch3AsRows =
parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(10000, fetch3AsRows.getBatch().get().count());
assertEquals(fetch2AsRows.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch());
fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table");
Dataset<Row> rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table");
assertEquals(10000, rowDataset.count());

// 4. Extract with latest checkpoint => no new data returned
InputBatch<JavaRDD<GenericRecord>> fetch4 =
parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(Option.empty(), fetch4.getBatch());

// 5. Extract from the beginning
InputBatch<JavaRDD<GenericRecord>> fetch5 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());
}
}

0 comments on commit ed745df

Please sign in to comment.