Skip to content

Commit

Permalink
[core] Fix jdbc catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs committed Mar 8, 2024
1 parent 99ce494 commit 4e1b3d1
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class JdbcCatalog extends AbstractCatalog {

protected JdbcCatalog(
FileIO fileIO, String catalogKey, Map<String, String> config, String warehouse) {
super(fileIO);
super(fileIO, Options.fromMap(config));
this.catalogKey = StringUtils.isBlank(catalogKey) ? "jdbc" : catalogKey;
this.options = config;
this.warehouse = warehouse;
Expand Down Expand Up @@ -347,10 +348,8 @@ public boolean caseSensitive() {
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(JdbcCatalogLock.createFactory(connections, catalogKey, options))
: Optional.empty();
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, catalogKey, options));
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;

/** Factory to create {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {
Expand All @@ -36,7 +40,13 @@ public String identifier() {

@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
String catalogKey = context.options().get(JdbcCatalogOptions.CATALOG_KEY);
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
if (options.get(LOCK_ENABLED)) {
if (!options.getOptional(LOCK_TYPE).isPresent()) {
options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
}
}
return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,11 @@ public void close() throws IOException {
// Do nothing
}

/** Create a jdbc lock factory. */
public static LockFactory createFactory(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
return new JdbcCatalogLockFactory(connections, catalogName, conf);
}

private static class JdbcCatalogLockFactory implements LockFactory {
/** Jdbc catalog lock factory. */
public static class JdbcCatalogLockFactory implements LockFactory {

private static final long serialVersionUID = 1L;
private static final String IDENTIFIER = "jdbc";
private final JdbcClientPool connections;
private final String catalogName;
private final Map<String, String> conf;

public JdbcCatalogLockFactory(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
this.connections = connections;
this.catalogName = catalogName;
this.conf = conf;
}
public static final String IDENTIFIER = "jdbc";

@Override
public String identifier() {
Expand All @@ -114,8 +99,25 @@ public String identifier() {

@Override
public CatalogLock create(LockContext context) {
JdbcLockContext lockContext = (JdbcLockContext) context;
return new JdbcCatalogLock(
connections, catalogName, checkMaxSleep(conf), acquireTimeout(conf));
lockContext.connections,
lockContext.catalogName,
checkMaxSleep(lockContext.conf),
acquireTimeout(lockContext.conf));
}
}

static class JdbcLockContext implements LockContext {
private final JdbcClientPool connections;
private final String catalogName;
private final Map<String, String> conf;

public JdbcLockContext(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
this.connections = connections;
this.catalogName = catalogName;
this.conf = conf;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

org.apache.paimon.catalog.FileSystemCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private JdbcCatalog initCatalog(Map<String, String> props) {
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
Expand Down

0 comments on commit 4e1b3d1

Please sign in to comment.