Skip to content

Commit

Permalink
[core] AbstractFileStoreWrite should not be the interface (apache#2446)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 4, 2023
1 parent 2378319 commit a73ef92
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;

import org.slf4j.Logger;
Expand All @@ -60,8 +59,7 @@
*
* @param <T> type of record to write.
*/
public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T>, Restorable<List<AbstractFileStoreWrite.State<T>>> {
public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {

private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);

Expand Down Expand Up @@ -116,6 +114,7 @@ public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
this.ignorePreviousFiles = ignorePreviousFiles;
}

@Override
public void withCompactExecutor(ExecutorService compactExecutor) {
this.lazyCompactExecutor = compactExecutor;
this.closeCompactExecutorWhenLeaving = false;
Expand Down Expand Up @@ -352,7 +351,7 @@ public WriterContainer<T> createWriterContainer(
}

@Override
public void isStreamingMode(boolean isStreamingMode) {
public void withExecutionMode(boolean isStreamingMode) {
this.isStreamingMode = isStreamingMode;
}

Expand Down Expand Up @@ -445,46 +444,4 @@ protected WriterContainer(
this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
}
}

/** Recoverable state of {@link AbstractFileStoreWrite}. */
public static class State<T> {
protected final BinaryRow partition;
protected final int bucket;

protected final long baseSnapshotId;
protected final long lastModifiedCommitIdentifier;
protected final List<DataFileMeta> dataFiles;
@Nullable protected final IndexMaintainer<T> indexMaintainer;
protected final CommitIncrement commitIncrement;

protected State(
BinaryRow partition,
int bucket,
long baseSnapshotId,
long lastModifiedCommitIdentifier,
Collection<DataFileMeta> dataFiles,
@Nullable IndexMaintainer<T> indexMaintainer,
CommitIncrement commitIncrement) {
this.partition = partition;
this.bucket = bucket;
this.baseSnapshotId = baseSnapshotId;
this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
this.dataFiles = new ArrayList<>(dataFiles);
this.indexMaintainer = indexMaintainer;
this.commitIncrement = commitIncrement;
}

@Override
public String toString() {
return String.format(
"{%s, %d, %d, %d, %s, %s, %s}",
partition,
bucket,
baseSnapshotId,
lastModifiedCommitIdentifier,
dataFiles,
indexMaintainer,
commitIncrement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,31 @@
import org.apache.paimon.FileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.Restorable;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;

/**
* Write operation which provides {@link RecordWriter} creation and writes {@link SinkRecord} to
* {@link FileStore}.
*
* @param <T> type of record to write.
*/
public interface FileStoreWrite<T> {
public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State<T>>> {

FileStoreWrite<T> withIOManager(IOManager ioManager);

Expand All @@ -64,6 +72,18 @@ default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
*/
void withIgnorePreviousFiles(boolean ignorePreviousFiles);

/**
* We detect whether it is in batch mode, if so, we do some optimization.
*
* @param isStreamingMode whether in streaming mode
*/
void withExecutionMode(boolean isStreamingMode);

/** With metrics to measure compaction. */
FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);

void withCompactExecutor(ExecutorService compactExecutor);

/**
* Write the data to the store according to the partition and bucket.
*
Expand Down Expand Up @@ -109,20 +129,53 @@ default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier)
throws Exception;

/**
* We detect whether it is in batch mode, if so, we do some optimization.
*
* @param isStreamingMode whether in streaming mode
*/
void isStreamingMode(boolean isStreamingMode);

/**
* Close the writer.
*
* @throws Exception the thrown exception
*/
void close() throws Exception;

/** With metrics to measure compaction. */
FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry);
/** Recoverable state of {@link FileStoreWrite}. */
class State<T> {

protected final BinaryRow partition;
protected final int bucket;

protected final long baseSnapshotId;
protected final long lastModifiedCommitIdentifier;
protected final List<DataFileMeta> dataFiles;
@Nullable protected final IndexMaintainer<T> indexMaintainer;
protected final CommitIncrement commitIncrement;

protected State(
BinaryRow partition,
int bucket,
long baseSnapshotId,
long lastModifiedCommitIdentifier,
Collection<DataFileMeta> dataFiles,
@Nullable IndexMaintainer<T> indexMaintainer,
CommitIncrement commitIncrement) {
this.partition = partition;
this.bucket = bucket;
this.baseSnapshotId = baseSnapshotId;
this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
this.dataFiles = new ArrayList<>(dataFiles);
this.indexMaintainer = indexMaintainer;
this.commitIncrement = commitIncrement;
}

@Override
public String toString() {
return String.format(
"{%s, %d, %d, %d, %s, %s, %s}",
partition,
bucket,
baseSnapshotId,
lastModifiedCommitIdentifier,
dataFiles,
indexMaintainer,
commitIncrement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public BatchWriteBuilder withOverwrite(@Nullable Map<String, String> staticParti
public BatchTableWrite newWrite() {
return table.newWrite(commitUser)
.withIgnorePreviousFiles(staticPartition != null)
.isStreamingMode(false);
.withExecutionMode(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite {
InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);

// we detect whether in streaming mode, and do some optimization
InnerTableWrite isStreamingMode(boolean isStreamingMode);
InnerTableWrite withExecutionMode(boolean isStreamingMode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.utils.Restorable;

import java.util.List;
Expand All @@ -41,10 +41,9 @@
*
* @param <T> type of record to write into {@link FileStore}.
*/
public class TableWriteImpl<T>
implements InnerTableWrite, Restorable<List<AbstractFileStoreWrite.State<T>>> {
public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State<T>>> {

private final AbstractFileStoreWrite<T> write;
private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;

Expand All @@ -54,7 +53,7 @@ public TableWriteImpl(
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
RecordExtractor<T> recordExtractor) {
this.write = (AbstractFileStoreWrite<T>) write;
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
}
Expand All @@ -66,8 +65,8 @@ public TableWriteImpl<T> withIgnorePreviousFiles(boolean ignorePreviousFiles) {
}

@Override
public TableWriteImpl<T> isStreamingMode(boolean isStreamingMode) {
write.isStreamingMode(isStreamingMode);
public TableWriteImpl<T> withExecutionMode(boolean isStreamingMode) {
write.withExecutionMode(isStreamingMode);
return this;
}

Expand Down Expand Up @@ -183,17 +182,17 @@ public void close() throws Exception {
}

@Override
public List<AbstractFileStoreWrite.State<T>> checkpoint() {
public List<State<T>> checkpoint() {
return write.checkpoint();
}

@Override
public void restore(List<AbstractFileStoreWrite.State<T>> state) {
public void restore(List<State<T>> state) {
write.restore(state);
}

@VisibleForTesting
public AbstractFileStoreWrite<T> getWrite() {
public FileStoreWrite<T> getWrite() {
return write;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.HashIndexMaintainer;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
Expand Down Expand Up @@ -51,8 +52,7 @@ public void testOverwriteDynamicBucketTable() throws Exception {
TableWriteImpl batchTableWrite = (TableWriteImpl) builder.withOverwrite().newWrite();
HashIndexMaintainer indexMaintainer =
(HashIndexMaintainer)
batchTableWrite
.getWrite()
((AbstractFileStoreWrite<?>) (batchTableWrite.getWrite()))
.createWriterContainer(BinaryRow.EMPTY_ROW, 0, true)
.indexMaintainer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
Expand Down Expand Up @@ -686,7 +687,9 @@ public void testUsingTheSameCompactExecutor() throws Exception {
List<ExecutorService> compactExecutors = new ArrayList<>();
for (StoreSinkWrite storeSinkWrite : storeSinkWrites) {
StoreSinkWriteImpl storeSinkWriteImpl = (StoreSinkWriteImpl) storeSinkWrite;
compactExecutors.add(storeSinkWriteImpl.getWrite().getWrite().getCompactExecutor());
compactExecutors.add(
((AbstractFileStoreWrite<?>) storeSinkWriteImpl.getWrite().getWrite())
.getCompactExecutor());
}
assertThat(compactExecutors.get(0) == compactExecutors.get(1)).isTrue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
Expand Down Expand Up @@ -146,7 +146,7 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
state.stateValueFilter().filter(table.name(), part, bucket))
.withIOManager(paimonIOManager)
.withIgnorePreviousFiles(ignorePreviousFiles)
.isStreamingMode(isStreamingMode);
.withExecutionMode(isStreamingMode);

if (metricGroup != null) {
tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
Expand Down Expand Up @@ -240,7 +240,7 @@ public void replace(FileStoreTable newTable) throws Exception {
return;
}

List<? extends AbstractFileStoreWrite.State<?>> states = write.checkpoint();
List<? extends FileStoreWrite.State<?>> states = write.checkpoint();
write.close();
write = newTableWrite(newTable);
write.restore((List) states);
Expand Down

0 comments on commit a73ef92

Please sign in to comment.