Skip to content

Commit

Permalink
[core] Refactor catalog lock factory for paimon Factory
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs committed Feb 18, 2024
1 parent e8aa707 commit 35f4fcc
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistE
getDataTableLocation(identifier),
tableSchema,
new CatalogEnvironment(
Lock.factory(lockFactory().orElse(null), identifier),
Lock.factory(
lockFactory().orElse(null), lockContext().orElse(null), identifier),
metastoreClientFactory(identifier).orElse(null),
lineageMetaFactory));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ public interface Catalog extends AutoCloseable {
* Get lock factory from catalog. Lock is used to support multiple concurrent writes on the
* object store.
*/
Optional<CatalogLock.Factory> lockFactory();
Optional<CatalogLock.LockFactory> lockFactory();

/** Get lock context for lock factory to create a lock. */
default Optional<CatalogLock.LockContext> lockContext() {
return Optional.empty();
}

/** Get metastore client factory for the table specified by {@code identifier}. */
default Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.catalog;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.factories.Factory;

import java.io.Closeable;
import java.io.Serializable;
Expand All @@ -36,7 +37,10 @@ public interface CatalogLock extends Closeable {
<T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;

/** Factory to create {@link CatalogLock}. */
interface Factory extends Serializable {
CatalogLock create();
interface LockFactory extends Factory, Serializable {
CatalogLock create(LockContext context);
}

/** Context for lock factory to create lock. */
interface LockContext extends Serializable {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
}

@Override
public Optional<CatalogLock.Factory> lockFactory() {
public Optional<CatalogLock.LockFactory> lockFactory() {
return Optional.empty();
}

Expand Down
18 changes: 13 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/operation/Lock.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ interface Factory extends Serializable {
Lock create();
}

static Factory factory(@Nullable CatalogLock.Factory lockFactory, Identifier tablePath) {
static Factory factory(
@Nullable CatalogLock.LockFactory lockFactory,
@Nullable CatalogLock.LockContext lockContext,
Identifier tablePath) {
return lockFactory == null
? new EmptyFactory()
: new CatalogLockFactory(lockFactory, tablePath);
: new CatalogLockFactory(lockFactory, lockContext, tablePath);
}

static Factory emptyFactory() {
Expand All @@ -58,17 +61,22 @@ class CatalogLockFactory implements Factory {

private static final long serialVersionUID = 1L;

private final CatalogLock.Factory lockFactory;
private final CatalogLock.LockFactory lockFactory;
private final CatalogLock.LockContext lockContext;
private final Identifier tablePath;

public CatalogLockFactory(CatalogLock.Factory lockFactory, Identifier tablePath) {
public CatalogLockFactory(
CatalogLock.LockFactory lockFactory,
CatalogLock.LockContext lockContext,
Identifier tablePath) {
this.lockFactory = lockFactory;
this.lockContext = lockContext;
this.tablePath = tablePath;
}

@Override
public Lock create() {
return fromCatalog(lockFactory.create(), tablePath);
return fromCatalog(lockFactory.create(lockContext), tablePath);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,15 @@ public HiveCatalog(
}

@Override
public Optional<CatalogLock.Factory> lockFactory() {
return lockEnabled()
? Optional.of(HiveCatalogLock.createFactory(hiveConf, clientClassName))
: Optional.empty();
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled() ? Optional.of(HiveCatalogLock.createFactory()) : Optional.empty();
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(
new HiveCatalogLock.HiveLockContext(
new SerializableHiveConf(hiveConf), clientClassName));
}

private boolean lockEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT;
import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Hive {@link CatalogLock}. */
public class HiveCatalogLock implements CatalogLock {
Expand Down Expand Up @@ -112,30 +113,31 @@ public void close() {
}

/** Create a hive lock factory. */
public static CatalogLock.Factory createFactory(HiveConf hiveConf, String clientClassName) {
return new HiveCatalogLockFactory(hiveConf, clientClassName);
public static LockFactory createFactory() {
return new HiveCatalogLockLockFactory();
}

private static class HiveCatalogLockFactory implements CatalogLock.Factory {
private static class HiveCatalogLockLockFactory implements LockFactory {

private static final long serialVersionUID = 1L;

private final SerializableHiveConf hiveConf;
private final String clientClassName;

public HiveCatalogLockFactory(HiveConf hiveConf, String clientClassName) {
this.hiveConf = new SerializableHiveConf(hiveConf);
this.clientClassName = clientClassName;
}
private static final String IDENTIFIER = "hive";

@Override
public CatalogLock create() {
HiveConf conf = hiveConf.conf();
public CatalogLock create(LockContext context) {
checkArgument(context instanceof HiveLockContext);
HiveLockContext hiveLockContext = (HiveLockContext) context;
HiveConf conf = hiveLockContext.hiveConf.conf();
return new HiveCatalogLock(
HiveCatalog.createClient(conf, clientClassName),
HiveCatalog.createClient(conf, hiveLockContext.clientClassName),
checkMaxSleep(conf),
acquireTimeout(conf));
}

@Override
public String identifier() {
return IDENTIFIER;
}
}

public static long checkMaxSleep(HiveConf conf) {
Expand All @@ -153,4 +155,14 @@ public static long acquireTimeout(HiveConf conf) {
TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue())))
.toMillis();
}

static class HiveLockContext implements LockContext {
private final SerializableHiveConf hiveConf;
private final String clientClassName;

public HiveLockContext(SerializableHiveConf hiveConf, String clientClassName) {
this.hiveConf = hiveConf;
this.clientClassName = clientClassName;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -689,11 +689,9 @@ public void testAlterTable() throws Exception {
@Test
public void testHiveLock() throws InterruptedException {
tEnv.executeSql("CREATE TABLE t (a INT)");
CatalogLock.Factory lockFactory =
((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get())
.catalog()
.lockFactory()
.get();
Catalog catalog =
((FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
CatalogLock.LockFactory lockFactory = catalog.lockFactory().get();

AtomicInteger count = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
Expand All @@ -708,7 +706,7 @@ public void testHiveLock() throws InterruptedException {
Thread thread =
new Thread(
() -> {
CatalogLock lock = lockFactory.create();
CatalogLock lock = lockFactory.create(catalog.lockContext().get());
for (int j = 0; j < 10; j++) {
try {
lock.runWithLock("test_db", "t", unsafeIncrement);
Expand Down

0 comments on commit 35f4fcc

Please sign in to comment.