Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support commit isolation level #3805

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1303,6 +1303,14 @@ public class CoreOptions implements Serializable {
"The maximum number of concurrent deleting files. "
+ "By default is the number of processors available to the Java virtual machine.");

public static final ConfigOption<CommitIsolationLevel> COMMIT_ISOLATION_LEVEL =
key("commit.isolation-level")
.enumType(CommitIsolationLevel.class)
.defaultValue(CommitIsolationLevel.READ_COMMITTED)
.withDescription(
"Controls the isolation level of the commit operation, "
+ "with a relaxed isolation level being used by default.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -2568,6 +2576,12 @@ public enum RangeStrategy {
QUANTITY
}

/** Commit isolation level. */
public enum CommitIsolationLevel {
READ_COMMITTED,
SERIALIZABLE
}

/** Specifies the log consistency mode for table. */
public enum ConsumerMode implements DescribedEnum {
EXACTLY_ONCE(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.exception;

/** When this exception is thrown, the commit fails and some data cleanup can be performed. */
public class CommitFailedException extends RuntimeException {
public CommitFailedException(String msg) {
super(msg);
}

public CommitFailedException(Throwable e) {
super(e);
}

public CommitFailedException(String msg, Throwable e) {
super(msg, e);
}

public CommitFailedException(Throwable e, String msg, Object... args) {
super(String.format(msg, args), e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.exception;

/**
* When this exception is thrown, you should stop whatever you are doing. Don't clean/delete
* anything.This is because we can't be sure that the commit was successful.
*/
public class CommitStateUnknownException extends RuntimeException {
public CommitStateUnknownException(String msg) {
super(msg);
}

public CommitStateUnknownException(Throwable e) {
super(e);
}

public CommitStateUnknownException(String msg, Throwable e) {
super(msg, e);
}

public CommitStateUnknownException(Throwable e, String msg, Object... args) {
super(String.format(msg, args), e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.exception;

/**
* This means that in the case of concurrent operations, a client has a dirty commit. In this case,
* we should clean up the commit and stop it.
*/
public class DirtyCommitException extends RuntimeException {
public DirtyCommitException(String msg) {
super(msg);
}

public DirtyCommitException(Throwable e) {
super(e);
}

public DirtyCommitException(String msg, Throwable e) {
super(msg, e);
}

public DirtyCommitException(Throwable e, String msg, Object... args) {
super(String.format(msg, args), e);
}
}
3 changes: 3 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ default boolean tryToWriteAtomic(Path path, String content) throws IOException {
try {
writeFile(tmp, content, false);
success = rename(tmp, path);
} catch (IOException e) {
// try check once
success = exists(path) && !exists(tmp);
Copy link
Contributor Author

@BsoBird BsoBird Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After throwing an IO exception, it's a good idea to check it again.Currently, this checking strategy is not fully applicable to object storage systems and may result in false positives.However, as the industry evolves, object storage systems are starting to support mutex operations.In the long run, I think we need to add check operations.

However, please note that we can only check for IO exceptions after they are caught, because they are thrown by the server, and we should not do any checking for client-side exceptions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a separate PR?

sure

} finally {
if (!success) {
deleteQuietly(tmp);
Expand Down
24 changes: 24 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,30 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<type>jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<type>jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<type>jar</type>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
Expand Down Expand Up @@ -205,7 +206,10 @@ public FileStoreCommitImpl newCommit(String commitUser) {
options.branch(),
newStatsFileHandler(),
bucketMode(),
options.scanManifestParallelism());
options.scanManifestParallelism(),
schema.options(),
newBranchManager(),
newTagManager());
}

@Override
Expand Down Expand Up @@ -240,6 +244,12 @@ public TagManager newTagManager() {
return new TagManager(fileIO, options.path());
}

@Override
public BranchManager newBranchManager() {
return new BranchManager(
fileIO, options.path(), snapshotManager(), newTagManager(), schemaManager);
}

@Override
public TagDeletion newTagDeletion() {
return new TagDeletion(
Expand Down
3 changes: 3 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -86,6 +87,8 @@ public interface FileStore<T> extends Serializable {

TagManager newTagManager();

BranchManager newBranchManager();

TagDeletion newTagDeletion();

@Nullable
Expand Down
Loading