Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin committed Apr 1, 2024
1 parent 5502bd7 commit 3de594c
Show file tree
Hide file tree
Showing 13 changed files with 35 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public AppendOnlyWriter(
public void write(InternalRow rowData) throws Exception {
Preconditions.checkArgument(
rowData.getRowKind().isAdd(),
"Append-only writer can only accept insert or update_after row kind, but current row kind is: %s",
"Append-only writer can only accept insert or update_after row kind, but current row kind is: %s. "
+ "You can configure 'ignore-delete' to ignore retract records.",
rowData.getRowKind());
boolean success = sinkWriter.write(rowData);
if (!success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void add(KeyValue kv) {
// refresh key object to avoid reference overwritten
currentKey = kv.key();

// ignore or retract?
if (kv.valueKind().isRetract()) {
if (fieldSequenceEnabled) {
retractWithSequenceGroup(kv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
private final int writerNumberMax;
@Nullable private final IndexMaintainer.Factory<T> indexFactory;
@Nullable private final DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory;
protected final boolean ignoreDelete;

@Nullable protected IOManager ioManager;

Expand All @@ -88,8 +87,7 @@ protected AbstractFileStoreWrite(
@Nullable IndexMaintainer.Factory<T> indexFactory,
@Nullable DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory,
String tableName,
int writerNumberMax,
boolean ignoreDelete) {
int writerNumberMax) {
this.commitUser = commitUser;
this.snapshotManager = snapshotManager;
this.scan = scan;
Expand All @@ -98,7 +96,6 @@ protected AbstractFileStoreWrite(
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
this.ignoreDelete = ignoreDelete;
}

@Override
Expand All @@ -125,19 +122,13 @@ public void withCompactExecutor(ExecutorService compactExecutor) {

@Override
public void write(BinaryRow partition, int bucket, T data) throws Exception {
if (skipData(data)) {
return;
}

WriterContainer<T> container = getWriterWrapper(partition, bucket);
container.writer.write(data);
if (container.indexMaintainer != null) {
container.indexMaintainer.notifyNewRecord(data);
}
}

protected abstract boolean skipData(T data);

@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
getWriterWrapper(partition, bucket).writer.compact(fullCompaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,4 @@ protected void forceBufferSpill() throws Exception {
}
}
}

@Override
protected boolean skipData(InternalRow data) {
return ignoreDelete && data.getRowKind().isRetract();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,6 @@ protected MergeTreeWriter createWriter(
UserDefinedSeqComparator.create(valueType, options));
}

@Override
protected boolean skipData(KeyValue data) {
return ignoreDelete && data.valueKind().isRetract();
}

@VisibleForTesting
public boolean bufferSpillable() {
return options.writeBufferSpillable(fileIO.isObjectStore(), isStreamingMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ public MemoryFileStoreWrite(
indexFactory,
deletionVectorsMaintainerFactory,
tableName,
options.writeMaxWritersToSpill(),
options.ignoreDelete());
options.writeMaxWritersToSpill());
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ record -> {
"Append only writer can not accept row with RowKind %s",
record.row().getRowKind());
return record.row();
});
},
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ record -> {
? row.getRowKind()
: rowKindGenerator.generate(row);
return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
});
},
CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Restorable;

import javax.annotation.Nullable;

import java.util.List;
import java.util.concurrent.ExecutorService;

Expand All @@ -47,17 +49,20 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State
private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
private final boolean ignoreDelete;

private boolean batchCommitted = false;
private BucketMode bucketMode;

public TableWriteImpl(
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
RecordExtractor<T> recordExtractor) {
RecordExtractor<T> recordExtractor,
boolean ignoreDelete) {
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
this.ignoreDelete = ignoreDelete;
}

@Override
Expand Down Expand Up @@ -121,13 +126,21 @@ public void write(InternalRow row, int bucket) throws Exception {
writeAndReturn(row, bucket);
}

@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
if (ignoreDelete && row.getRowKind().isRetract()) {
return null;
}
SinkRecord record = toSinkRecord(row);
write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
return record;
}

@Nullable
public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
if (ignoreDelete && row.getRowKind().isRetract()) {
return null;
}
SinkRecord record = toSinkRecord(row, bucket);
write.write(record.partition(), bucket, recordExtractor.extract(record));
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,22 @@ public GlobalFullCompactionSinkWrite(
}

@Override
@Nullable
public SinkRecord write(InternalRow rowData) throws Exception {
SinkRecord sinkRecord = super.write(rowData);
touchBucket(sinkRecord.partition(), sinkRecord.bucket());
if (sinkRecord != null) {
touchBucket(sinkRecord.partition(), sinkRecord.bucket());
}
return sinkRecord;
}

@Override
@Nullable
public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
SinkRecord sinkRecord = super.write(rowData, bucket);
touchBucket(sinkRecord.partition(), bucket);
if (sinkRecord != null) {
touchBucket(sinkRecord.partition(), bucket);
}
return sinkRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ record = write.write(element.getValue());
throw new IOException(e);
}

if (logSinkFunction != null) {
if (record != null && logSinkFunction != null) {
// write to log store, need to preserve original pk (which includes partition fields)
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
/** Helper class of {@link PrepareCommitOperator} for different types of paimon sinks. */
public interface StoreSinkWrite {

@Nullable
SinkRecord write(InternalRow rowData) throws Exception;

@Nullable
SinkRecord write(InternalRow rowData, int bucket) throws Exception;

SinkRecord toLogRecord(SinkRecord record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,13 @@ public void withCompactExecutor(ExecutorService compactExecutor) {
}

@Override
@Nullable
public SinkRecord write(InternalRow rowData) throws Exception {
return write.writeAndReturn(rowData);
}

@Override
@Nullable
public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
return write.writeAndReturn(rowData, bucket);
}
Expand Down

0 comments on commit 3de594c

Please sign in to comment.