Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Move commit callbacks from TableCommit into FileStoreCommit #3834

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
Expand All @@ -55,6 +56,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

Expand Down Expand Up @@ -184,6 +186,11 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {

@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return newCommit(commitUser, Collections.emptyList());
}

@Override
public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> callbacks) {
return new FileStoreCommitImpl(
fileIO,
schemaManager,
Expand All @@ -205,7 +212,8 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.branch(),
newStatsFileHandler(),
bucketMode(),
options.scanManifestParallelism());
options.scanManifestParallelism(),
callbacks);
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -80,6 +81,8 @@ public interface FileStore<T> extends Serializable {

FileStoreCommit newCommit(String commitUser);

FileStoreCommit newCommit(String commitUser, List<CommitCallback> callbacks);

SnapshotDeletion newSnapshotDeletion();

ChangelogDeletion newChangelogDeletion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,13 +172,14 @@ public void dropPartition(Identifier identifier, Map<String, String> partitionSp
throws TableNotExistException {
Table table = getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit =
try (FileStoreCommit commit =
fileStoreTable
.store()
.newCommit(
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
commit.dropPartitions(
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
commit.dropPartitions(
Collections.singletonList(partitionSpec), BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}

protected abstract void createDatabaseImpl(String name, Map<String, String> properties);
Expand Down Expand Up @@ -354,8 +355,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else {
Table table = getDataTable(identifier);
return table;
return getDataTable(identifier);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
Expand All @@ -45,6 +46,8 @@
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -111,18 +114,26 @@ public IcebergCommitCallback(FileStoreTable table, String commitUser) {
}

@Override
public void call(List<ManifestCommittable> committables) {
for (ManifestCommittable committable : committables) {
try {
commitMetadata(committable);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
public void call(
List<ManifestEntry> committedEntries, long identifier, @Nullable Long watermark) {
try {
commitMetadata(identifier);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void retry(ManifestCommittable committable) {
try {
commitMetadata(committable.identifier());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void commitMetadata(ManifestCommittable committable) throws IOException {
Pair<Long, Long> pair = getCurrentAndBaseSnapshotIds(committable.identifier());
private void commitMetadata(long identifier) throws IOException {
Pair<Long, Long> pair = getCurrentAndBaseSnapshotIds(identifier);
long currentSnapshot = pair.getLeft();
Long baseSnapshot = pair.getRight();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.paimon.metastore;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;

import org.apache.paimon.shade.guava30.com.google.common.cache.Cache;
import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.List;

Expand All @@ -48,9 +52,18 @@ public AddPartitionCommitCallback(MetastoreClient client) {
}

@Override
public void call(List<ManifestCommittable> committables) {
committables.stream()
.flatMap(c -> c.fileCommittables().stream())
public void call(
List<ManifestEntry> committedEntries, long identifier, @Nullable Long watermark) {
committedEntries.stream()
.filter(e -> FileKind.ADD.equals(e.kind()))
.map(ManifestEntry::partition)
.distinct()
.forEach(this::addPartition);
}

@Override
public void retry(ManifestCommittable committable) {
committable.fileCommittables().stream()
.map(CommitMessage::partition)
.distinct()
.forEach(this::addPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.paimon.metastore;

import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.tag.TagPreview;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;

Expand All @@ -37,12 +40,19 @@ public TagPreviewCommitCallback(AddPartitionTagCallback tagCallback, TagPreview
}

@Override
public void call(List<ManifestCommittable> committables) {
public void call(
List<ManifestEntry> committedEntries, long identifier, @Nullable Long watermark) {
long currentMillis = System.currentTimeMillis();
Optional<String> tagOptional = tagPreview.extractTag(currentMillis, watermark);
tagOptional.ifPresent(tagCallback::notifyCreation);
}

@Override
public void retry(ManifestCommittable committable) {
long currentMillis = System.currentTimeMillis();
for (ManifestCommittable c : committables) {
Optional<String> tagOptional = tagPreview.extractTag(currentMillis, c.watermark());
tagOptional.ifPresent(tagCallback::notifyCreation);
}
Optional<String> tagOptional =
tagPreview.extractTag(currentMillis, committable.watermark());
tagOptional.ifPresent(tagCallback::notifyCreation);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,17 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

/** Commit operation which provides commit and overwrite. */
public interface FileStoreCommit {
public interface FileStoreCommit extends AutoCloseable {

/** With global lock. */
FileStoreCommit withLock(Lock lock);

FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);

/** Find out which commit identifier need to be retried when recovering from the failure. */
Set<Long> filterCommitted(Set<Long> commitIdentifiers);
/** Find out which committables need to be retried when recovering from the failure. */
List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables);

/** Commit from manifest committable. */
void commit(ManifestCommittable committable, Map<String, String> properties);
Expand Down Expand Up @@ -88,4 +87,7 @@ void overwrite(
FileStorePathFactory pathFactory();

FileIO fileIO();

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,12 @@
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -121,6 +123,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
@Nullable private final Comparator<InternalRow> keyComparator;
private final String branchName;
@Nullable private final Integer manifestReadParallelism;
private final List<CommitCallback> commitCallbacks;

@Nullable private Lock lock;
private boolean ignoreEmptyCommit;
Expand Down Expand Up @@ -152,7 +155,8 @@ public FileStoreCommitImpl(
String branchName,
StatsFileHandler statsFileHandler,
BucketMode bucketMode,
@Nullable Integer manifestReadParallelism) {
@Nullable Integer manifestReadParallelism,
List<CommitCallback> commitCallbacks) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.commitUser = commitUser;
Expand All @@ -172,6 +176,7 @@ public FileStoreCommitImpl(
this.keyComparator = keyComparator;
this.branchName = branchName;
this.manifestReadParallelism = manifestReadParallelism;
this.commitCallbacks = commitCallbacks;

this.lock = null;
this.ignoreEmptyCommit = true;
Expand All @@ -193,25 +198,33 @@ public FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit) {
}

@Override
public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committables) {
// nothing to filter, fast exit
if (commitIdentifiers.isEmpty()) {
return commitIdentifiers;
if (committables.isEmpty()) {
return committables;
}

for (int i = 1; i < committables.size(); i++) {
Preconditions.checkArgument(
committables.get(i).identifier() > committables.get(i - 1).identifier(),
"Committables must be sorted according to identifiers before filtering. This is unexpected.");
}

Optional<Snapshot> latestSnapshot = snapshotManager.latestSnapshotOfUser(commitUser);
if (latestSnapshot.isPresent()) {
Set<Long> result = new HashSet<>();
for (Long identifier : commitIdentifiers) {
List<ManifestCommittable> result = new ArrayList<>();
for (ManifestCommittable committable : committables) {
// if committable is newer than latest snapshot, then it hasn't been committed
if (identifier > latestSnapshot.get().commitIdentifier()) {
result.add(identifier);
if (committable.identifier() > latestSnapshot.get().commitIdentifier()) {
result.add(committable);
} else {
commitCallbacks.forEach(callback -> callback.retry(committable));
}
}
return result;
} else {
// if there is no previous snapshots then nothing should be filtered
return commitIdentifiers;
return committables;
}
}

Expand Down Expand Up @@ -986,6 +999,7 @@ public boolean tryCommitOnce(
identifier,
commitKind.name()));
}
commitCallbacks.forEach(callback -> callback.call(tableFiles, identifier, watermark));
return true;
}

Expand Down Expand Up @@ -1229,6 +1243,13 @@ private void cleanUpTmpManifests(
}
}

@Override
public void close() {
for (CommitCallback callback : commitCallbacks) {
IOUtils.closeQuietly(callback);
}
}

private static class LevelIdentifier {

private final BinaryRow partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -139,6 +140,12 @@ public FileStoreCommit newCommit(String commitUser) {
return wrapped.newCommit(commitUser);
}

@Override
public FileStoreCommit newCommit(String commitUser, List<CommitCallback> callbacks) {
privilegeChecker.assertCanInsert(identifier);
return wrapped.newCommit(commitUser, callbacks);
}

@Override
public SnapshotDeletion newSnapshotDeletion() {
privilegeChecker.assertCanInsert(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,7 @@ public TableCommitImpl newCommit(String commitUser) {
}

return new TableCommitImpl(
store().newCommit(commitUser),
createCommitCallbacks(commitUser),
store().newCommit(commitUser, createCommitCallbacks(commitUser)),
snapshotExpire,
options.writeOnly() ? null : store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
Expand All @@ -389,7 +388,7 @@ private List<CommitCallback> createCommitCallbacks(String commitUser) {

if (options.partitionedTableInMetastore()
&& metastoreClientFactory != null
&& tableSchema.partitionKeys().size() > 0) {
&& !tableSchema.partitionKeys().isEmpty()) {
callbacks.add(new AddPartitionCommitCallback(metastoreClientFactory.create()));
}

Expand Down
Loading
Loading