Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 25, 2024
1 parent eece701 commit b728230
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.client.ClientPool;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -47,6 +48,8 @@ public class FileSystemCatalog extends AbstractCatalog {

private final Path warehouse;

private ClientPool.ClientPoolImpl clientPool;

public FileSystemCatalog(FileIO fileIO, Path warehouse) {
super(fileIO);
this.warehouse = warehouse;
Expand Down Expand Up @@ -159,7 +162,10 @@ private SchemaManager schemaManager(Identifier identifier) {

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return LockContextUtils.lockContext(catalogOptions, "filesystem");
if (clientPool == null) {
this.clientPool = LockContextUtils.tryInitializeClientPool(catalogOptions);
}
return LockContextUtils.lockContext(this.clientPool, catalogOptions, "filesystem");
}

@Override
Expand Down Expand Up @@ -194,7 +200,7 @@ private static String database(Path path) {

@Override
public void close() throws Exception {
LockContextUtils.close();
LockContextUtils.close(clientPool);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.catalog;

import org.apache.paimon.client.ClientPool;
import org.apache.paimon.jdbc.JdbcCatalogFactory;
import org.apache.paimon.jdbc.JdbcCatalogLock;
import org.apache.paimon.jdbc.JdbcClientPool;
Expand All @@ -36,18 +37,15 @@ public class LockContextUtils {

private static final Logger LOG = LoggerFactory.getLogger(FileSystemCatalog.class);

private static JdbcClientPool connections;

public static Optional<CatalogLock.LockContext> lockContext(
Options catalogOptions, String catalogKey) {
String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
if (lockType == null) {
ClientPool.ClientPoolImpl clientPool, Options catalogOptions, String catalogKey) {
if (clientPool == null) {
return Optional.of(new AbstractCatalog.OptionLockContext(catalogOptions));
}
String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
switch (lockType) {
case JdbcCatalogFactory.IDENTIFIER:
// Try init jdbc connections.
tryInitializeJdbcConnections(catalogOptions);
JdbcClientPool connections = (JdbcClientPool) clientPool;
return Optional.of(
new JdbcCatalogLock.JdbcLockContext(
connections, catalogKey, catalogOptions));
Expand All @@ -57,27 +55,44 @@ public static Optional<CatalogLock.LockContext> lockContext(
}
}

private static void tryInitializeJdbcConnections(Options catalogOptions) {
if (connections == null) {
connections =
new JdbcClientPool(
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
catalogOptions.get(CatalogOptions.URI.key()),
catalogOptions.toMap());
try {
JdbcUtils.createDistributedLockTable(connections, catalogOptions);
} catch (SQLException e) {
throw new RuntimeException("Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
public static ClientPool.ClientPoolImpl tryInitializeClientPool(Options catalogOptions) {
String lockType = catalogOptions.get(CatalogOptions.LOCK_TYPE);
if (lockType == null) {
return null;
}
switch (lockType) {
case JdbcCatalogFactory.IDENTIFIER:
JdbcClientPool connections =
new JdbcClientPool(
catalogOptions.get(CatalogOptions.CLIENT_POOL_SIZE),
catalogOptions.get(CatalogOptions.URI.key()),
catalogOptions.toMap());
try {
JdbcUtils.createDistributedLockTable(connections, catalogOptions);
} catch (SQLException e) {
throw new RuntimeException("Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
return connections;
default:
LOG.warn("Unsupported lock type:" + lockType);
return null;
}
}

public static void close() {
if (connections != null && !connections.isClosed()) {
connections.close();
connections = null;
public static void close(ClientPool.ClientPoolImpl clientPool) {
if (clientPool == null) {
return;
}
if (clientPool instanceof JdbcClientPool) {
JdbcClientPool connections = (JdbcClientPool) clientPool;
if (!connections.isClosed()) {
connections.close();
}
} else {
clientPool.close();
}
clientPool = null;
}
}

0 comments on commit b728230

Please sign in to comment.