Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core]commit failed, add table name to the log #4478

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
protected final FileIO fileIO;
protected final SchemaManager schemaManager;
protected final TableSchema schema;
protected final String tableName;
protected final CoreOptions options;
protected final RowType partitionType;
private final CatalogEnvironment catalogEnvironment;
Expand All @@ -83,12 +84,14 @@ protected AbstractFileStore(
FileIO fileIO,
SchemaManager schemaManager,
TableSchema schema,
String tableName,
CoreOptions options,
RowType partitionType,
CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.tableName = tableName;
this.options = options;
this.partitionType = partitionType;
this.catalogEnvironment = catalogEnvironment;
Expand Down Expand Up @@ -209,6 +212,7 @@ public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> cal
return new FileStoreCommitImpl(
fileIO,
schemaManager,
tableName,
commitUser,
partitionType,
options.partitionDefaultName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {

private final RowType bucketKeyType;
private final RowType rowType;
private final String tableName;

public AppendOnlyFileStore(
FileIO fileIO,
Expand All @@ -61,10 +60,9 @@ public AppendOnlyFileStore(
RowType rowType,
String tableName,
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment);
super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
this.tableName = tableName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final String tableName;

public KeyValueFileStore(
FileIO fileIO,
Expand All @@ -86,7 +85,7 @@ public KeyValueFileStore(
MergeFunctionFactory<KeyValue> mfFactory,
String tableName,
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment);
super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
Expand All @@ -99,7 +98,6 @@ public KeyValueFileStore(
options.changelogRowDeduplicate()
? ValueEqualiserSupplier.fromIgnoreFields(valueType, logDedupIgnoreFields)
: () -> null;
this.tableName = tableName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {

private final FileIO fileIO;
private final SchemaManager schemaManager;
private final String tableName;
private final String commitUser;
private final RowType partitionType;
private final String partitionDefaultName;
Expand Down Expand Up @@ -142,6 +143,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
public FileStoreCommitImpl(
FileIO fileIO,
SchemaManager schemaManager,
String tableName,
String commitUser,
RowType partitionType,
String partitionDefaultName,
Expand All @@ -165,6 +167,7 @@ public FileStoreCommitImpl(
int commitMaxRetries) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.tableName = tableName;
this.commitUser = commitUser;
this.partitionType = partitionType;
this.partitionDefaultName = partitionDefaultName;
Expand Down Expand Up @@ -1331,8 +1334,9 @@ private void assertNoDelete(
for (SimpleFileEntry entry : mergedEntries) {
Preconditions.checkState(
entry.kind() != FileKind.DELETE,
"Trying to delete file %s which is not previously added.",
entry.fileName());
"Trying to delete file %s for table %s which is not previously added.",
entry.fileName(),
tableName);
}
} catch (Throwable e) {
if (partitionExpire != null && partitionExpire.isValueExpiration()) {
Expand Down
Loading