Skip to content

Commit

Permalink
[core] Move commit callbacks from TableCommit into FileStoreCommit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Jul 29, 2024
1 parent 499ba13 commit e551e33
Show file tree
Hide file tree
Showing 21 changed files with 178 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
Expand All @@ -55,6 +56,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

Expand Down Expand Up @@ -184,6 +186,11 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {

@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, Collections.emptyList());
}

@Override
public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> callbacks) {
return new FileStoreCommitImpl(
fileIO,
schemaManager,
Expand All @@ -205,7 +212,8 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.branch(),
newStatsFileHandler(),
bucketMode(),
options.scanManifestParallelism());
options.scanManifestParallelism(),
callbacks);
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -79,6 +80,8 @@ public interface FileStore<T> {

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, List<CommitCallback> callbacks);

SnapshotDeletion newSnapshotDeletion();

ChangelogDeletion newChangelogDeletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,14 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
throws TableNotExistException {
Table table = getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit =
try (FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
commit.dropPartitions(
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
commit.dropPartitions(
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}

protected abstract void createDatabaseImpl(String name, Map<String, String> properties);
Expand Down Expand Up @@ -354,8 +355,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
Table table = getDataTable(identifier);
return table;
return getDataTable(identifier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
Expand All @@ -45,6 +46,8 @@
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -111,18 +114,26 @@ public IcebergCommitCallback(FileStoreTable table, String commitUser) {
}

@Override
public void call(List<ManifestCommittable> committables) {
for (ManifestCommittable committable : committables) {
try {
commitMetadata(committable);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
public void call(
List<ManifestEntry> committedEntries, long identifier, @Nullable Long watermark) {
try {
commitMetadata(identifier);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void retry(ManifestCommittable committable) {
try {
commitMetadata(committable.identifier());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void commitMetadata(ManifestCommittable committable) throws IOException {
Pair<Long, Long> pair = getCurrentAndBaseSnapshotIds(committable.identifier());
private void commitMetadata(long identifier) throws IOException {
Pair<Long, Long> pair = getCurrentAndBaseSnapshotIds(identifier);
long currentSnapshot = pair.getLeft();
Long baseSnapshot = pair.getRight();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.paimon.metastore;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;

import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.List;

Expand All @@ -48,9 +52,18 @@ public AddPartitionCommitCallback(MetastoreClient client) {
}

@Override
public void call(List<ManifestCommittable> committables) {
committables.stream()
.flatMap(c -> c.fileCommittables().stream())
public void call(
List<ManifestEntry> committedEntries, long identifier, @Nullable Long watermark) {
committedEntries.stream()
.filter(e -> FileKind.ADD.equals(e.kind()))
.map(ManifestEntry::partition)
.distinct()
.forEach(this::addPartition);
}

@Override
public void retry(ManifestCommittable committable) {
committable.fileCommittables().stream()
.map(CommitMessage::partition)
.distinct()
.forEach(this::addPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.paimon.metastore;

import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.tag.TagPreview;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;

Expand All @@ -37,12 +40,19 @@ public TagPreviewCommitCallback(AddPartitionTagCallback tagCallback, TagPreview
}

@Override
public void call(List<ManifestCommittable> committables) {
public void call(
List<ManifestEntry> committedEntries, long identifier, @Nullable Long watermark) {
long currentMillis = System.currentTimeMillis();
Optional<String> tagOptional = tagPreview.extractTag(currentMillis, watermark);
tagOptional.ifPresent(tagCallback::notifyCreation);
}

@Override
public void retry(ManifestCommittable committable) {
long currentMillis = System.currentTimeMillis();
for (ManifestCommittable c : committables) {
Optional<String> tagOptional = tagPreview.extractTag(currentMillis, c.watermark());
tagOptional.ifPresent(tagCallback::notifyCreation);
}
Optional<String> tagOptional =
tagPreview.extractTag(currentMillis, committable.watermark());
tagOptional.ifPresent(tagCallback::notifyCreation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

/** Commit operation which provides commit and overwrite. */
public interface FileStoreCommit {
public interface FileStoreCommit extends AutoCloseable {

/** With global lock. */
FileStoreCommit withLock(Lock lock);

FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);

/** Find out which commit identifier need to be retried when recovering from the failure. */
Set<Long> filterCommitted(Set<Long> commitIdentifiers);
/** Find out which committables need to be retried when recovering from the failure. */
List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables);

/** Commit from manifest committable. */
void commit(ManifestCommittable committable, Map<String, String> properties);
Expand Down Expand Up @@ -88,4 +87,7 @@ void overwrite(
FileStorePathFactory pathFactory();

FileIO fileIO();

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -121,6 +123,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable private final Comparator<InternalRow> keyComparator;
private final String branchName;
@Nullable private final Integer manifestReadParallelism;
private final List<CommitCallback> commitCallbacks;

@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
Expand Down Expand Up @@ -152,7 +155,8 @@ public FileStoreCommitImpl(
String branchName,
StatsFileHandler statsFileHandler,
BucketMode bucketMode,
@Nullable Integer manifestReadParallelism) {
@Nullable Integer manifestReadParallelism,
List<CommitCallback> commitCallbacks) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
Expand All @@ -172,6 +176,7 @@ public FileStoreCommitImpl(
this.keyComparator = keyComparator;
this.branchName = branchName;
this.manifestReadParallelism = manifestReadParallelism;
this.commitCallbacks = commitCallbacks;

this.lock = null;
this.ignoreEmptyCommit = true;
Expand All @@ -193,25 +198,33 @@ public FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit) {
}

@Override
public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables) {
// nothing to filter, fast exit
if (commitIdentifiers.isEmpty()) {
return commitIdentifiers;
if (committables.isEmpty()) {
return committables;
}

for (int i = 1; i < committables.size(); i++) {
Preconditions.checkArgument(
committables.get(i).identifier() > committables.get(i - 1).identifier(),
"Committables must be sorted according to identifiers before filtering. This is unexpected.");
}

Optional<Snapshot> latestSnapshot = snapshotManager.latestSnapshotOfUser(commitUser);
if (latestSnapshot.isPresent()) {
Set<Long> result = new HashSet<>();
for (Long identifier : commitIdentifiers) {
List<ManifestCommittable> result = new ArrayList<>();
for (ManifestCommittable committable : committables) {
// if committable is newer than latest snapshot, then it hasn't been committed
if (identifier > latestSnapshot.get().commitIdentifier()) {
result.add(identifier);
if (committable.identifier() > latestSnapshot.get().commitIdentifier()) {
result.add(committable);
} else {
commitCallbacks.forEach(callback -> callback.retry(committable));
}
}
return result;
} else {
// if there is no previous snapshots then nothing should be filtered
return commitIdentifiers;
return committables;
}
}

Expand Down Expand Up @@ -986,6 +999,7 @@ public boolean tryCommitOnce(
identifier,
commitKind.name()));
}
commitCallbacks.forEach(callback -> callback.call(tableFiles, identifier, watermark));
return true;
}

Expand Down Expand Up @@ -1229,6 +1243,13 @@ private void cleanUpTmpManifests(
}
}

@Override
public void close() {
for (CommitCallback callback : commitCallbacks) {
IOUtils.closeQuietly(callback);
}
}

private static class LevelIdentifier {

private final BinaryRow partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -139,6 +140,12 @@ public FileStoreCommit newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
}

@Override
public FileStoreCommit newCommit(String commitUser, List<CommitCallback> callbacks) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newCommit(commitUser, callbacks);
}

@Override
public SnapshotDeletion newSnapshotDeletion() {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,7 @@ public TableCommitImpl newCommit(String commitUser) {
}

return new TableCommitImpl(
store().newCommit(commitUser),
createCommitCallbacks(commitUser),
store().newCommit(commitUser, createCommitCallbacks(commitUser)),
snapshotExpire,
options.writeOnly() ? null : store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
Expand All @@ -389,7 +388,7 @@ private List<CommitCallback> createCommitCallbacks(String commitUser) {

if (options.partitionedTableInMetastore()
&& metastoreClientFactory != null
&& tableSchema.partitionKeys().size() > 0) {
&& !tableSchema.partitionKeys().isEmpty()) {
callbacks.add(new AddPartitionCommitCallback(metastoreClientFactory.create()));
}

Expand Down
Loading

0 comments on commit e551e33

Please sign in to comment.