Skip to content

Commit

Permalink
[core] Add batch record write interface for table. (apache#4034)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored Aug 23, 2024
1 parent 16cf881 commit ee49c92
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.format;

import org.apache.paimon.io.BatchRecords;

import java.io.IOException;

/** Format write with batch interface. */
public interface BatchFormatWriter extends FormatWriter {

/**
* Write a batch of records directly.
*
* @param batchRecords the records to be written
* @throws IOException if exception happens
*/
void writeBatch(BatchRecords batchRecords) throws IOException;
}
32 changes: 32 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/io/BatchRecords.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.InternalRow;

/** Interface of Batch records. */
public interface BatchRecords extends Iterable<InternalRow> {

/**
* The total row count of this batch.
*
* @return the number of row count.
*/
long rowCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
Expand All @@ -40,6 +41,7 @@
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BatchRecordWriter;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.IOFunction;
import org.apache.paimon.utils.IOUtils;
Expand All @@ -60,7 +62,7 @@
* A {@link RecordWriter} implementation that only accepts records which are always insert
* operations and don't have any unique keys or sort keys.
*/
public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner {
public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {

private final FileIO fileIO;
private final long schemaId;
Expand Down Expand Up @@ -165,6 +167,17 @@ public void write(InternalRow rowData) throws Exception {
}
}

@Override
public void writeBatch(BatchRecords batchRecords) throws Exception {
if (sinkWriter instanceof BufferedSinkWriter) {
for (InternalRow row : batchRecords) {
write(row);
}
} else {
((DirectSinkWriter) sinkWriter).writeBatch(batchRecords);
}
}

@Override
public void compact(boolean fullCompaction) throws Exception {
flush(true, fullCompaction);
Expand Down Expand Up @@ -386,6 +399,13 @@ public boolean write(InternalRow data) throws IOException {
return true;
}

public void writeBatch(BatchRecords batch) throws IOException {
if (writer == null) {
writer = createRollingRowWriter();
}
writer.writeBatch(batch);
}

@Override
public List<DataFileMeta> flush() throws IOException {
List<DataFileMeta> flushedFiles = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,30 @@ public void write(T row) throws IOException {
}
}

public void writeBatch(BatchRecords batch) throws IOException {
try {
// Open the current writer if write the first record or roll over happen before.
if (currentWriter == null) {
openCurrentWriter();
}

currentWriter.writeBatch(batch);
recordCount += batch.rowCount();

if (rollingFile()) {
closeCurrentWriter();
}
} catch (Throwable e) {
LOG.warn(
"Exception occurs when writing file "
+ (currentWriter == null ? null : currentWriter.path())
+ ". Cleaning up.",
e);
abort();
throw e;
}
}

private void openCurrentWriter() {
currentWriter = writerFactory.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.io;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BatchFormatWriter;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.AsyncPositionOutputStream;
Expand Down Expand Up @@ -94,6 +95,27 @@ public void write(T record) throws IOException {
writeImpl(record);
}

public void writeBatch(BatchRecords batchRecords) throws IOException {
if (closed) {
throw new RuntimeException("Writer has already closed!");
}

try {
if (writer instanceof BatchFormatWriter) {
((BatchFormatWriter) writer).writeBatch(batchRecords);
} else {
for (InternalRow row : batchRecords) {
writer.addElement(row);
}
}
recordCount += batchRecords.rowCount();
} catch (Throwable e) {
LOG.warn("Exception occurs when writing file " + path + ". Cleaning up.", e);
abort();
throw e;
}
}

protected InternalRow writeImpl(T record) throws IOException {
if (closed) {
throw new RuntimeException("Writer has already closed!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public Map<BinaryRow, List<Integer>> getActiveBuckets() {
return result;
}

private WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
protected WriterContainer<T> getWriterWrapper(BinaryRow partition, int bucket) {
Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
if (buckets == null) {
buckets = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
Expand Down Expand Up @@ -62,7 +63,8 @@
import java.util.function.Function;

/** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow> {
public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
implements BatchWriter {

private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyFileStoreWrite.class);

Expand Down Expand Up @@ -271,4 +273,10 @@ protected void forceBufferSpill() throws Exception {
}
}
}

@Override
public void writeBatch(BinaryRow partition, int bucket, BatchRecords data) throws Exception {
WriterContainer<InternalRow> container = getWriterWrapper(partition, bucket);
((AppendOnlyWriter) container.writer).writeBatch(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.BatchRecords;

/** Whether the writer could write batch. */
public interface BatchWriter {

/**
* Write the batch data to the store according to the partition and bucket.
*
* @param partition the partition of the data
* @param bucket the bucket id of the data
* @param data the given data
* @throws Exception the thrown exception when writing the record
*/
void writeBatch(BinaryRow partition, int bucket, BatchRecords data) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.Table;
Expand Down Expand Up @@ -61,6 +62,9 @@ public interface TableWrite extends AutoCloseable {
/** Write a row with bucket. */
void write(InternalRow row, int bucket) throws Exception;

/** Write a batch records directly, not per row. */
void writeBatch(BinaryRow partition, int bucket, BatchRecords batch) throws Exception;

/**
* Compact a bucket of a partition. By default, it will determine whether to perform the
* compaction according to the 'num-sorted-run.compaction-trigger' option. If fullCompaction is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.BatchWriter;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.table.BucketMode;
Expand Down Expand Up @@ -150,6 +152,17 @@ public void write(InternalRow row, int bucket) throws Exception {
writeAndReturn(row, bucket);
}

@Override
public void writeBatch(BinaryRow partition, int bucket, BatchRecords batch) throws Exception {
if (write instanceof BatchWriter) {
((BatchWriter) write).writeBatch(partition, bucket, batch);
} else {
for (InternalRow row : batch) {
write(row, bucket);
}
}
}

@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
checkNullability(row);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.BatchRecords;

/** Write {@link BatchRecords} directly. */
public interface BatchRecordWriter extends RecordWriter<InternalRow> {

/** Add a batch elemens to the writer. */
void writeBatch(BatchRecords record) throws Exception;
}
Loading

0 comments on commit ee49c92

Please sign in to comment.