Skip to content

Commit

Permalink
Revert "[core] Add batch record write interface for table. (#4029)"
Browse files Browse the repository at this point in the history
This reverts commit 9c300c9.
  • Loading branch information
JingsongLi committed Aug 22, 2024
1 parent 9c300c9 commit 1d84aa5
Show file tree
Hide file tree
Showing 10 changed files with 0 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.paimon.format;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.BatchRecords;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/** The writer that writes records. */
public interface FormatWriter {
Expand All @@ -49,10 +47,6 @@ public interface FormatWriter {
*/
void flush() throws IOException;

default void writeBatch(BatchRecords batchRecords) throws IOException {
throw new UnsupportedEncodingException("Not supported.");
}

/**
* Finishes the writing. This must flush all internal buffer, finish encoding, and write
* footers.
Expand Down
27 changes: 0 additions & 27 deletions paimon-common/src/main/java/org/apache/paimon/io/BatchRecords.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
Expand Down Expand Up @@ -173,14 +172,6 @@ public void write(InternalRow rowData) throws Exception {
}
}

@Override
public void writeBatch(BatchRecords batchRecords) throws Exception {
if (sinkWriter instanceof BufferedSinkWriter) {
throw new RuntimeException("Not suppor BufferedSinkWriter.");
}
((DirectSinkWriter) sinkWriter).writeBatch(batchRecords);
}

@Override
public void compact(boolean fullCompaction) throws Exception {
flush(true, fullCompaction);
Expand Down Expand Up @@ -400,14 +391,6 @@ public boolean write(InternalRow data) throws IOException {
return true;
}

public boolean writeBatch(BatchRecords batch) throws IOException {
if (writer == null) {
writer = createRollingRowWriter();
}
writer.writeBatch(batch);
return true;
}

@Override
public List<DataFileMeta> flush() throws IOException {
List<DataFileMeta> flushedFiles = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,30 +95,6 @@ public void write(T row) throws IOException {
}
}

public void writeBatch(BatchRecords batch) throws IOException {
try {
// Open the current writer if write the first record or roll over happen before.
if (currentWriter == null) {
openCurrentWriter();
}

currentWriter.writeBatch(batch);
recordCount += batch.rowCount();

if (rollingFile()) {
closeCurrentWriter();
}
} catch (Throwable e) {
LOG.warn(
"Exception occurs when writing file "
+ (currentWriter == null ? null : currentWriter.path())
+ ". Cleaning up.",
e);
abort();
throw e;
}
}

private void openCurrentWriter() {
currentWriter = writerFactory.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,6 @@ public void write(T record) throws IOException {
writeImpl(record);
}

public void writeBatch(BatchRecords batchRecords) throws IOException {
if (closed) {
throw new RuntimeException("Writer has already closed!");
}

writer.writeBatch(batchRecords);
recordCount += batchRecords.rowCount();
}

protected InternalRow writeImpl(T record) throws IOException {
if (closed) {
throw new RuntimeException("Writer has already closed!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestEntry;
Expand Down Expand Up @@ -144,12 +143,6 @@ public void write(BinaryRow partition, int bucket, T data) throws Exception {
}
}

@Override
public void writeBatch(BinaryRow partition, int bucket, BatchRecords data) throws Exception {
WriterContainer<T> container = getWriterWrapper(partition, bucket);
container.writer.writeBatch(data);
}

@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
getWriterWrapper(partition, bucket).writer.compact(fullCompaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
Expand Down Expand Up @@ -105,16 +104,6 @@ default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
*/
void write(BinaryRow partition, int bucket, T data) throws Exception;

/**
* Write the batch data to the store according to the partition and bucket.
*
* @param partition the partition of the data
* @param bucket the bucket id of the data
* @param data the given data
* @throws Exception the thrown exception when writing the record
*/
void writeBatch(BinaryRow partition, int bucket, BatchRecords data) throws Exception;

/**
* Compact data stored in given partition and bucket. Note that compaction process is only
* submitted and may not be completed when the method returns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.paimon.table.sink;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.BatchRecords;

import java.util.List;

Expand All @@ -38,7 +36,4 @@ public interface BatchTableWrite extends TableWrite {
* @see BatchTableCommit#commit
*/
List<CommitMessage> prepareCommit() throws Exception;

/** Write a batch records directly, not per row. */
void writeBatch(BinaryRow partition, int bucket, BatchRecords batch) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
Expand Down Expand Up @@ -247,11 +246,6 @@ public List<CommitMessage> prepareCommit() throws Exception {
return prepareCommit(true, BatchWriteBuilder.COMMIT_IDENTIFIER);
}

@Override
public void writeBatch(BinaryRow partition, int bucket, BatchRecords batch) throws Exception {
write.writeBatch(partition, bucket, batch);
}

@Override
public void close() throws Exception {
write.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.utils;

import org.apache.paimon.io.BatchRecords;
import org.apache.paimon.io.DataFileMeta;

import java.util.Collection;
Expand All @@ -36,11 +35,6 @@ public interface RecordWriter<T> {
/** Add a key-value element to the writer. */
void write(T record) throws Exception;

/** Add a batch elemens to the writer. */
default void writeBatch(BatchRecords record) throws Exception {
throw new UnsupportedOperationException("Not supported");
}

/**
* Compact files related to the writer. Note that compaction process is only submitted and may
* not be completed when the method returns.
Expand Down

0 comments on commit 1d84aa5

Please sign in to comment.