Skip to content

Commit

Permalink
Support common jdbc catalog lock for filesystem catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 28, 2024
1 parent 3237e1a commit 11154c3
Show file tree
Hide file tree
Showing 20 changed files with 334 additions and 21 deletions.
11 changes: 11 additions & 0 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ USE CATALOG my_catalog;

You can define any default table options with the prefix `table-default.` for tables created in the catalog.


The FileSystem catalog supports jdbc lock and can take effect through the following configuration:

> ```shell
> 'lock.uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
> 'lock.jdbc.user' = '...',
> 'lock.jdbc.password' = '...',
> ```
{{< /tab >}}
{{< tab "Spark3" >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class CatalogOptions {
ConfigOptions.key("lock.type")
.stringType()
.noDefaultValue()
.withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'.");
.withDescription(
"The Lock Type for Catalog, such as 'hive', 'zookeeper', 'jdbc'.");

public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
key("lock-check-max-sleep")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public abstract class AbstractCatalog implements Catalog {
public static final String DB_SUFFIX = ".db";
protected static final String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
protected static final String DB_LOCATION_PROP = "location";
protected static final String LOCK_PROP_PREFIX = "lock.";

protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
Expand Down Expand Up @@ -105,6 +106,30 @@ public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

@Override
public Optional<CatalogLockContextFactory> lockContextFactory() {
String lock = catalogOptions.get(LOCK_TYPE);
if (lock == null) {
return Optional.empty();
}
return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLockContextFactory.class,
lock));
}

public Options extractLockConfiguration(Map<String, String> properties) {
Map<String, String> result = new HashMap<>();
properties.forEach(
(key, value) -> {
if (key.startsWith(LOCK_PROP_PREFIX)) {
result.put(key.substring(LOCK_PROP_PREFIX.length()), value);
}
});
return Options.fromMap(result);
}

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public interface Catalog extends AutoCloseable {
*/
Optional<CatalogLockFactory> lockFactory();

/** Get lock context factory for lock factory to create a lock. */
default Optional<CatalogLockContextFactory> lockContextFactory() {
return Optional.empty();
}

/** Get lock context for lock factory to create a lock. */
default Optional<CatalogLockContext> lockContext() {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.factories.Factory;
import org.apache.paimon.options.Options;

import java.io.Serializable;

/** Context for lock context factory to create lock. */
public interface CatalogLockContextFactory extends Factory, Serializable {
CatalogLockContext createLockContext(Options lockOptions, boolean closeConnectionsUsed);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand Down Expand Up @@ -198,4 +199,13 @@ public String warehouse() {
public boolean caseSensitive() {
return catalogOptions.get(CASE_SENSITIVE);
}

@Override
public Optional<CatalogLockContext> lockContext() {
return lockContextFactory()
.map(
factory ->
factory.createLockContext(
extractLockConfiguration(catalogOptions.toMap()), true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.TableType;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;

/** Factory to create {@link FileSystemCatalog}. */
Expand All @@ -40,6 +42,9 @@ public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
throw new IllegalArgumentException(
"Only managed table is supported in File system catalog.");
}
if (context.options().get(LOCK_ENABLED) && context.options().get(LOCK_TYPE) == null) {
throw new IllegalArgumentException("Please configure the lock type correctly.");
}
return new FileSystemCatalog(fileIO, warehouse, context.options());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public Optional<CatalogLockFactory> defaultLockFactory() {

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options));
return Optional.of(new JdbcCatalogLockContext(connections, options, false));
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class JdbcCatalogLock implements CatalogLock {
private final long checkMaxSleep;
private final long acquireTimeout;
private final String catalogKey;
private final boolean closeConnectionsUsed;

public JdbcCatalogLock(
JdbcClientPool connections,
Expand All @@ -46,6 +47,20 @@ public JdbcCatalogLock(
this.checkMaxSleep = checkMaxSleep;
this.acquireTimeout = acquireTimeout;
this.catalogKey = catalogKey;
this.closeConnectionsUsed = false;
}

public JdbcCatalogLock(
JdbcClientPool connections,
String catalogKey,
long checkMaxSleep,
long acquireTimeout,
boolean closeConnectionsUsed) {
this.connections = connections;
this.checkMaxSleep = checkMaxSleep;
this.acquireTimeout = acquireTimeout;
this.catalogKey = catalogKey;
this.closeConnectionsUsed = closeConnectionsUsed;
}

@Override
Expand Down Expand Up @@ -83,7 +98,9 @@ private void lock(String lockUniqueName) throws SQLException, InterruptedExcepti

@Override
public void close() throws IOException {
// Do nothing
if (closeConnectionsUsed) {
connections.close();
}
}

public static long checkMaxSleep(Map<String, String> conf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,59 @@
package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import java.sql.SQLException;

/** Jdbc lock context. */
public class JdbcCatalogLockContext implements CatalogLockContext {

private final JdbcClientPool connections;
private JdbcClientPool connections;
private final boolean closeConnectionsUsed;
private final String catalogKey;
private final Options options;

public JdbcCatalogLockContext(JdbcClientPool connections, String catalogKey, Options options) {
this.connections = connections;
this.catalogKey = catalogKey;
public JdbcCatalogLockContext(Options options, boolean closeConnectionsUsed) {
this.options = options;
this.catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
this.closeConnectionsUsed = closeConnectionsUsed;
}

JdbcCatalogLockContext(
JdbcClientPool connections, Options options, boolean closeConnectionsUsed) {
this(options, closeConnectionsUsed);
this.connections = connections;
}

@Override
public Options options() {
return options;
}

public JdbcClientPool connections() {
return connections;
}

public String catalogKey() {
return catalogKey;
}

public boolean isCloseConnectionsUsed() {
return closeConnectionsUsed;
}

public JdbcClientPool clientPool() {
if (this.connections == null) {
this.connections =
new JdbcClientPool(
options.get(CatalogOptions.CLIENT_POOL_SIZE),
options.get(CatalogOptions.URI.key()),
options.toMap());
try {
JdbcUtils.createDistributedLockTable(connections, options);
} catch (SQLException e) {
throw new RuntimeException("Cannot initialize JDBC distributed lock.", e);
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted in call to initialize", e);
}
}
return connections;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.jdbc;

import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockContextFactory;
import org.apache.paimon.options.Options;

/** Factory for jdbc catalog lock context. */
public class JdbcCatalogLockContextFactory implements CatalogLockContextFactory {

@Override
public String identifier() {
return JdbcCatalogLockFactory.IDENTIFIER;
}

@Override
public CatalogLockContext createLockContext(Options lockOptions, boolean closeConnectionsUsed) {
return new JdbcCatalogLockContext(lockOptions, closeConnectionsUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public CatalogLock createLock(CatalogLockContext context) {
JdbcCatalogLockContext lockContext = (JdbcCatalogLockContext) context;
Map<String, String> optionsMap = lockContext.options().toMap();
return new JdbcCatalogLock(
lockContext.connections(),
lockContext.clientPool(),
lockContext.catalogKey(),
checkMaxSleep(optionsMap),
acquireTimeout(optionsMap));
acquireTimeout(optionsMap),
lockContext.isCloseConnectionsUsed());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
org.apache.paimon.catalog.FileSystemCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogLockFactory
org.apache.paimon.jdbc.JdbcCatalogLockContextFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.fs.Path;
import org.apache.paimon.jdbc.JdbcCatalog;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;

import org.junit.jupiter.api.BeforeEach;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link FileSystemCatalog}. */
public class FileSystemCatalogTest extends CatalogTestBase {

@BeforeEach
public void setUp() throws Exception {
super.setUp();
catalog = initCatalog(Maps.newHashMap());
}

private FileSystemCatalog initCatalog(Map<String, String> props) {
Map<String, String> properties = Maps.newHashMap();
properties.put(
AbstractCatalog.LOCK_PROP_PREFIX + CatalogOptions.URI.key(),
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));

properties.put(
AbstractCatalog.LOCK_PROP_PREFIX + JdbcCatalog.PROPERTY_PREFIX + "username",
"user");
properties.put(
AbstractCatalog.LOCK_PROP_PREFIX + JdbcCatalog.PROPERTY_PREFIX + "password",
"password");
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
FileSystemCatalog catalog =
new FileSystemCatalog(fileIO, new Path(warehouse), Options.fromMap(properties));
return catalog;
}

@Override
public void testListDatabasesWhenNoDatabases() {
List<String> databases = catalog.listDatabases();
assertThat(databases).isEqualTo(new ArrayList<>());
}
}
Loading

0 comments on commit 11154c3

Please sign in to comment.