Skip to content

Commit

Permalink
improve hive createReader
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Mar 1, 2024
1 parent d31aaf0 commit 29f92e9
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
@Override
public RecordReader<Void, RowDataContainer> getRecordReader(
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
FileStoreTable table = createFileStoreTable(jobConf);
PaimonInputSplit split = (PaimonInputSplit) inputSplit;
return createRecordReader(table, split, jobConf);
return createRecordReader(split, jobConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataOutputSerializer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.utils.InstantiationUtil;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
Expand All @@ -41,12 +43,15 @@ public class PaimonInputSplit extends FileSplit {
private String path;
private DataSplit split;

private FileStoreTable table;

// public no-argument constructor for deserialization
public PaimonInputSplit() {}

public PaimonInputSplit(String path, DataSplit split) {
public PaimonInputSplit(String path, DataSplit split, FileStoreTable table) {
this.path = path;
this.split = split;
this.table = table;
}

public DataSplit split() {
Expand All @@ -73,13 +78,28 @@ public String[] getLocations() {
return ANYWHERE;
}

public FileStoreTable getTable() {
return table;
}

@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(path);
DataOutputSerializer out = new DataOutputSerializer(128);
split.serialize(out);
dataOutput.writeInt(out.length());
dataOutput.write(out.getCopyOfBuffer());
writeFileStoreTable(dataOutput);
}

private void writeFileStoreTable(DataOutput dataOutput) throws IOException {
if (table == null) {
dataOutput.writeInt(0);
} else {
byte[] bytes = InstantiationUtil.serializeObject(table);
dataOutput.writeInt(bytes.length);
dataOutput.write(bytes);
}
}

@Override
Expand All @@ -89,6 +109,22 @@ public void readFields(DataInput dataInput) throws IOException {
byte[] bytes = new byte[length];
dataInput.readFully(bytes);
split = DataSplit.deserialize(new DataInputDeserializer(bytes));
readFileStoreTable(dataInput);
}

private void readFileStoreTable(DataInput dataInput) throws IOException {
int length = dataInput.readInt();
if (length > 0) {
byte[] bytes = new byte[length];
dataInput.readFully(bytes);
try {
table =
InstantiationUtil.deserializeObject(
bytes, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

@Override
Expand All @@ -105,11 +141,13 @@ public boolean equals(Object o) {
return false;
}
PaimonInputSplit that = (PaimonInputSplit) o;
return Objects.equals(path, that.path) && Objects.equals(split, that.split);
return Objects.equals(path, that.path)
&& Objects.equals(split, that.split)
&& Objects.equals(table, that.table);
}

@Override
public int hashCode() {
return Objects.hash(path, split);
return Objects.hash(path, split, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public float getProgress() throws IOException {
}

public static RecordReader<Void, RowDataContainer> createRecordReader(
FileStoreTable table, PaimonInputSplit split, JobConf jobConf) throws IOException {
PaimonInputSplit split, JobConf jobConf) throws IOException {
FileStoreTable table = split.getTable();
ReadBuilder readBuilder = table.newReadBuilder();
createPredicate(table.schema(), jobConf, true).ifPresent(readBuilder::withFilter);
List<String> paimonColumns = table.schema().fieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ public static InputSplit[] generateSplits(FileStoreTable table, JobConf jobConf)
scan.plan()
.splits()
.forEach(
split -> splits.add(new PaimonInputSplit(location, (DataSplit) split)));
split ->
splits.add(
new PaimonInputSplit(
location, (DataSplit) split, table)));
}
return splits.toArray(new InputSplit[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,20 @@
package org.apache.paimon.hive.mapred;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileTestDataGenerator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -29,8 +41,11 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -62,8 +77,12 @@ public void testWriteAndRead() throws Exception {
.map(d -> d.meta)
.collect(Collectors.toList()))
.build();
PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), dataSplit);
PaimonInputSplit split = new PaimonInputSplit(tempDir.toString(), dataSplit, null);

assertPaimonInputSplitSerialization(split);
}

private void assertPaimonInputSplitSerialization(PaimonInputSplit split) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
split.write(output);
Expand All @@ -75,4 +94,37 @@ public void testWriteAndRead() throws Exception {
actual.readFields(input);
assertThat(actual).isEqualTo(split);
}

@Test
public void testWriteAndReadWithTable() throws Exception {
Path path = new Path(tempDir.toString());
SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), path);
schemaManager.createTable(
new Schema(
RowType.of(VarCharType.STRING_TYPE).getFields(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyMap(),
""));

FileStoreTable fileStoreTable = FileStoreTableFactory.create(LocalFileIO.create(), path);
writeData(fileStoreTable);

DataSplit split = (DataSplit) fileStoreTable.newScan().plan().splits().get(0);

PaimonInputSplit paimonInputSplit =
new PaimonInputSplit(path.toString(), split, fileStoreTable);

assertPaimonInputSplitSerialization(paimonInputSplit);
}

private void writeData(FileStoreTable fileStoreTable) throws Exception {
String commitUser = UUID.randomUUID().toString();
TableWriteImpl<?> tableWrite = fileStoreTable.newWrite(commitUser);
tableWrite.write(GenericRow.of(BinaryString.fromString("1111")));
TableCommitImpl commit = fileStoreTable.newCommit(commitUser);
commit.commit(0, tableWrite.prepareCommit(true, 0));
tableWrite.close();
commit.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private PaimonRecordReader read(
List<String> originalColumns = ((FileStoreTable) table).schema().fieldNames();
return new PaimonRecordReader(
table.newReadBuilder(),
new PaimonInputSplit(tempDir.toString(), dataSplit),
new PaimonInputSplit(tempDir.toString(), dataSplit, (FileStoreTable) table),
originalColumns,
originalColumns,
selectedColumns,
Expand Down

0 comments on commit 29f92e9

Please sign in to comment.