Skip to content

Commit

Permalink
[core] Support configuring lock in paimon catalog (apache#2933)
Browse files Browse the repository at this point in the history
* [core] Support configuring lock in paimon catalog
  • Loading branch information
FangYongs authored Mar 15, 2024
1 parent 8338cad commit 8a0aec6
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 67 deletions.
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@
<td>Boolean</td>
<td>Enable Catalog Lock.</td>
</tr>
<tr>
<td><h5>lock.type</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Lock Type for Catalog, such as 'hive', 'zookeeper'.</td>
</tr>
<tr>
<td><h5>metastore</h5></td>
<td style="word-wrap: break-word;">"filesystem"</td>
<td>String</td>
<td>Metastore of paimon catalog, supports filesystemhive and jdbc.</td>
<td>Metastore of paimon catalog, supports filesystem, hive and jdbc.</td>
</tr>
<tr>
<td><h5>table.type</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class CatalogOptions {
.stringType()
.defaultValue("filesystem")
.withDescription(
"Metastore of paimon catalog, supports filesystemhive and jdbc.");
"Metastore of paimon catalog, supports filesystem, hive and jdbc.");

public static final ConfigOption<String> URI =
ConfigOptions.key("uri")
Expand All @@ -60,6 +60,12 @@ public class CatalogOptions {
.defaultValue(false)
.withDescription("Enable Catalog Lock.");

public static final ConfigOption<String> LOCK_TYPE =
ConfigOptions.key("lock.type")
.stringType()
.noDefaultValue()
.withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'.");

public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
key("lock-check-max-sleep")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -80,6 +83,30 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;

if (lockEnabled()) {
checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled.");
}
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLock.LockFactory.class,
catalogOptions.get(LOCK_TYPE)))
: Optional.empty();
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new OptionLockContext(catalogOptions));
}

protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

@Override
Expand Down Expand Up @@ -465,4 +492,12 @@ private void validateAutoCreateClose(Map<String, String> options) {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

static class OptionLockContext implements CatalogLock.LockContext {
private final Options catalogOptions;

public OptionLockContext(Options catalogOptions) {
this.catalogOptions = catalogOptions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand All @@ -34,7 +35,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand All @@ -56,11 +56,6 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
this.warehouse = warehouse;
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return Optional.empty();
}

@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
Expand Down Expand Up @@ -128,8 +123,7 @@ private boolean tableExists(Path tablePath) {

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Path path = getDataTableLocation(identifier);
return new SchemaManager(fileIO, path)
return schemaManager(identifier)
.latest()
.orElseThrow(() -> new TableNotExistException(identifier));
}
Expand All @@ -142,8 +136,24 @@ protected void dropTableImpl(Identifier identifier) {

@Override
public void createTableImpl(Identifier identifier, Schema schema) {
uncheck(() -> schemaManager(identifier).createTable(schema));
}

private SchemaManager schemaManager(Identifier identifier) {
Path path = getDataTableLocation(identifier);
uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
CatalogLock catalogLock =
lockFactory()
.map(
fac ->
fac.create(
lockContext()
.orElseThrow(
() ->
new RuntimeException(
"No lock context when lock is enabled."))))
.orElse(null);
return new SchemaManager(fileIO, path)
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}

@Override
Expand All @@ -156,7 +166,7 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
new SchemaManager(fileIO, getDataTableLocation(identifier)).commitChanges(changes);
schemaManager(identifier).commitChanges(changes);
}

private static <T> T uncheck(Callable<T> callable) {
Expand Down
15 changes: 4 additions & 11 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -55,7 +56,6 @@
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand All @@ -76,7 +76,7 @@ public class JdbcCatalog extends AbstractCatalog {

protected JdbcCatalog(
FileIO fileIO, String catalogKey, Map<String, String> config, String warehouse) {
super(fileIO);
super(fileIO, Options.fromMap(config));
this.catalogKey = catalogKey;
this.options = config;
this.warehouse = warehouse;
Expand Down Expand Up @@ -347,15 +347,8 @@ public boolean caseSensitive() {
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(JdbcCatalogLock.createFactory(connections, catalogKey, options))
: Optional.empty();
}

private boolean lockEnabled() {
return Boolean.parseBoolean(
options.getOrDefault(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, catalogKey, options));
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;

/** Factory to create {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {
Expand All @@ -36,7 +40,13 @@ public String identifier() {

@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
String catalogKey = context.options().get(JdbcCatalogOptions.CATALOG_KEY);
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
if (options.get(LOCK_ENABLED)) {
if (!options.getOptional(LOCK_TYPE).isPresent()) {
options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
}
}
return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,11 @@ public void close() throws IOException {
// Do nothing
}

/** Create a jdbc lock factory. */
public static LockFactory createFactory(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
return new JdbcCatalogLockFactory(connections, catalogName, conf);
}

private static class JdbcCatalogLockFactory implements LockFactory {
/** Jdbc catalog lock factory. */
public static class JdbcCatalogLockFactory implements LockFactory {

private static final long serialVersionUID = 1L;
private static final String IDENTIFIER = "jdbc";
private final JdbcClientPool connections;
private final String catalogName;
private final Map<String, String> conf;

public JdbcCatalogLockFactory(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
this.connections = connections;
this.catalogName = catalogName;
this.conf = conf;
}
public static final String IDENTIFIER = "jdbc";

@Override
public String identifier() {
Expand All @@ -114,8 +99,25 @@ public String identifier() {

@Override
public CatalogLock create(LockContext context) {
JdbcLockContext lockContext = (JdbcLockContext) context;
return new JdbcCatalogLock(
connections, catalogName, checkMaxSleep(conf), acquireTimeout(conf));
lockContext.connections,
lockContext.catalogName,
checkMaxSleep(lockContext.conf),
acquireTimeout(lockContext.conf));
}
}

static class JdbcLockContext implements LockContext {
private final JdbcClientPool connections;
private final String catalogName;
private final Map<String, String> conf;

public JdbcLockContext(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
this.connections = connections;
this.catalogName = catalogName;
this.conf = conf;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

org.apache.paimon.catalog.FileSystemCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private JdbcCatalog initCatalog(Map<String, String> props) {
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
Expand Down
Loading

0 comments on commit 8a0aec6

Please sign in to comment.