Skip to content

Commit

Permalink
[core] add table name to the log when commit failed (#4475)
Browse files Browse the repository at this point in the history
  • Loading branch information
melin authored Nov 7, 2024
1 parent 048882a commit 6c7b7bf
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
protected final FileIO fileIO;
protected final SchemaManager schemaManager;
protected final TableSchema schema;
protected final String tableName;
protected final CoreOptions options;
protected final RowType partitionType;
private final CatalogEnvironment catalogEnvironment;
Expand All @@ -83,12 +84,14 @@ protected AbstractFileStore(
FileIO fileIO,
SchemaManager schemaManager,
TableSchema schema,
String tableName,
CoreOptions options,
RowType partitionType,
CatalogEnvironment catalogEnvironment) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.tableName = tableName;
this.options = options;
this.partitionType = partitionType;
this.catalogEnvironment = catalogEnvironment;
Expand Down Expand Up @@ -209,6 +212,7 @@ public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> cal
return new FileStoreCommitImpl(
fileIO,
schemaManager,
tableName,
commitUser,
partitionType,
options.partitionDefaultName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {

private final RowType bucketKeyType;
private final RowType rowType;
private final String tableName;

public AppendOnlyFileStore(
FileIO fileIO,
Expand All @@ -61,10 +60,9 @@ public AppendOnlyFileStore(
RowType rowType,
String tableName,
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment);
super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment);
this.bucketKeyType = bucketKeyType;
this.rowType = rowType;
this.tableName = tableName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
private final Supplier<Comparator<InternalRow>> keyComparatorSupplier;
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final String tableName;

public KeyValueFileStore(
FileIO fileIO,
Expand All @@ -86,7 +85,7 @@ public KeyValueFileStore(
MergeFunctionFactory<KeyValue> mfFactory,
String tableName,
CatalogEnvironment catalogEnvironment) {
super(fileIO, schemaManager, schema, options, partitionType, catalogEnvironment);
super(fileIO, schemaManager, schema, tableName, options, partitionType, catalogEnvironment);
this.crossPartitionUpdate = crossPartitionUpdate;
this.bucketKeyType = bucketKeyType;
this.keyType = keyType;
Expand All @@ -99,7 +98,6 @@ public KeyValueFileStore(
options.changelogRowDeduplicate()
? ValueEqualiserSupplier.fromIgnoreFields(valueType, logDedupIgnoreFields)
: () -> null;
this.tableName = tableName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {

private final FileIO fileIO;
private final SchemaManager schemaManager;
private final String tableName;
private final String commitUser;
private final RowType partitionType;
private final String partitionDefaultName;
Expand Down Expand Up @@ -142,6 +143,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
public FileStoreCommitImpl(
FileIO fileIO,
SchemaManager schemaManager,
String tableName,
String commitUser,
RowType partitionType,
String partitionDefaultName,
Expand All @@ -165,6 +167,7 @@ public FileStoreCommitImpl(
int commitMaxRetries) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.tableName = tableName;
this.commitUser = commitUser;
this.partitionType = partitionType;
this.partitionDefaultName = partitionDefaultName;
Expand Down Expand Up @@ -1331,8 +1334,9 @@ private void assertNoDelete(
for (SimpleFileEntry entry : mergedEntries) {
Preconditions.checkState(
entry.kind() != FileKind.DELETE,
"Trying to delete file %s which is not previously added.",
entry.fileName());
"Trying to delete file %s for table %s which is not previously added.",
entry.fileName(),
tableName);
}
} catch (Throwable e) {
if (partitionExpire != null && partitionExpire.isValueExpiration()) {
Expand Down

0 comments on commit 6c7b7bf

Please sign in to comment.