Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Support configuring lock in paimon catalog #2933

Merged
merged 7 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@
<td>Boolean</td>
<td>Enable Catalog Lock.</td>
</tr>
<tr>
<td><h5>lock.type</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Lock Type for Catalog, such as 'hive', 'zookeeper'.</td>
</tr>
<tr>
<td><h5>metastore</h5></td>
<td style="word-wrap: break-word;">"filesystem"</td>
<td>String</td>
<td>Metastore of paimon catalog, supports filesystemhive and jdbc.</td>
<td>Metastore of paimon catalog, supports filesystem, hive and jdbc.</td>
</tr>
<tr>
<td><h5>table.type</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class CatalogOptions {
.stringType()
.defaultValue("filesystem")
.withDescription(
"Metastore of paimon catalog, supports filesystemhive and jdbc.");
"Metastore of paimon catalog, supports filesystem, hive and jdbc.");

public static final ConfigOption<String> URI =
ConfigOptions.key("uri")
Expand All @@ -60,6 +60,12 @@ public class CatalogOptions {
.defaultValue(false)
.withDescription("Enable Catalog Lock.");

public static final ConfigOption<String> LOCK_TYPE =
ConfigOptions.key("lock.type")
.stringType()
.noDefaultValue()
.withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'.");

public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
key("lock-check-max-sleep")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -80,6 +83,30 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;

if (lockEnabled()) {
checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled.");
}
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLock.LockFactory.class,
catalogOptions.get(LOCK_TYPE)))
: Optional.empty();
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new OptionLockContext(catalogOptions));
}

protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

@Override
Expand Down Expand Up @@ -465,4 +492,12 @@ private void validateAutoCreateClose(Map<String, String> options) {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

static class OptionLockContext implements CatalogLock.LockContext {
private final Options catalogOptions;

public OptionLockContext(Options catalogOptions) {
this.catalogOptions = catalogOptions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand All @@ -34,7 +35,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand All @@ -56,11 +56,6 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
this.warehouse = warehouse;
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return Optional.empty();
}

@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
Expand Down Expand Up @@ -128,8 +123,7 @@ private boolean tableExists(Path tablePath) {

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Path path = getDataTableLocation(identifier);
return new SchemaManager(fileIO, path)
return schemaManager(identifier)
.latest()
.orElseThrow(() -> new TableNotExistException(identifier));
}
Expand All @@ -142,8 +136,24 @@ protected void dropTableImpl(Identifier identifier) {

@Override
public void createTableImpl(Identifier identifier, Schema schema) {
uncheck(() -> schemaManager(identifier).createTable(schema));
}

private SchemaManager schemaManager(Identifier identifier) {
Path path = getDataTableLocation(identifier);
uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
CatalogLock catalogLock =
lockFactory()
.map(
fac ->
fac.create(
lockContext()
.orElseThrow(
() ->
new RuntimeException(
"No lock context when lock is enabled."))))
.orElse(null);
return new SchemaManager(fileIO, path)
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}

@Override
Expand All @@ -156,7 +166,7 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
new SchemaManager(fileIO, getDataTableLocation(identifier)).commitChanges(changes);
schemaManager(identifier).commitChanges(changes);
}

private static <T> T uncheck(Callable<T> callable) {
Expand Down
15 changes: 4 additions & 11 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -55,7 +56,6 @@
import static org.apache.paimon.jdbc.JdbcUtils.execute;
import static org.apache.paimon.jdbc.JdbcUtils.insertProperties;
import static org.apache.paimon.jdbc.JdbcUtils.updateTable;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;

/* This file is based on source code from the Iceberg Project (http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand All @@ -76,7 +76,7 @@ public class JdbcCatalog extends AbstractCatalog {

protected JdbcCatalog(
FileIO fileIO, String catalogKey, Map<String, String> config, String warehouse) {
super(fileIO);
super(fileIO, Options.fromMap(config));
this.catalogKey = catalogKey;
this.options = config;
this.warehouse = warehouse;
Expand Down Expand Up @@ -347,15 +347,8 @@ public boolean caseSensitive() {
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(JdbcCatalogLock.createFactory(connections, catalogKey, options))
: Optional.empty();
}

private boolean lockEnabled() {
return Boolean.parseBoolean(
options.getOrDefault(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, catalogKey, options));
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;

/** Factory to create {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {
Expand All @@ -36,7 +40,13 @@ public String identifier() {

@Override
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
String catalogKey = context.options().get(JdbcCatalogOptions.CATALOG_KEY);
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
if (options.get(LOCK_ENABLED)) {
if (!options.getOptional(LOCK_TYPE).isPresent()) {
options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
}
}
return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,11 @@ public void close() throws IOException {
// Do nothing
}

/** Create a jdbc lock factory. */
public static LockFactory createFactory(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
return new JdbcCatalogLockFactory(connections, catalogName, conf);
}

private static class JdbcCatalogLockFactory implements LockFactory {
/** Jdbc catalog lock factory. */
public static class JdbcCatalogLockFactory implements LockFactory {

private static final long serialVersionUID = 1L;
private static final String IDENTIFIER = "jdbc";
private final JdbcClientPool connections;
private final String catalogName;
private final Map<String, String> conf;

public JdbcCatalogLockFactory(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
this.connections = connections;
this.catalogName = catalogName;
this.conf = conf;
}
public static final String IDENTIFIER = "jdbc";

@Override
public String identifier() {
Expand All @@ -114,8 +99,25 @@ public String identifier() {

@Override
public CatalogLock create(LockContext context) {
JdbcLockContext lockContext = (JdbcLockContext) context;
return new JdbcCatalogLock(
connections, catalogName, checkMaxSleep(conf), acquireTimeout(conf));
lockContext.connections,
lockContext.catalogName,
checkMaxSleep(lockContext.conf),
acquireTimeout(lockContext.conf));
}
}

static class JdbcLockContext implements LockContext {
private final JdbcClientPool connections;
private final String catalogName;
private final Map<String, String> conf;

public JdbcLockContext(
JdbcClientPool connections, String catalogName, Map<String, String> conf) {
this.connections = connections;
this.catalogName = catalogName;
this.conf = conf;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

org.apache.paimon.catalog.FileSystemCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogFactory
org.apache.paimon.jdbc.JdbcCatalogLock$JdbcCatalogLockFactory
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private JdbcCatalog initCatalog(Map<String, String> props) {
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
properties.putAll(props);
JdbcCatalog catalog = new JdbcCatalog(fileIO, "test-jdbc-catalog", properties, warehouse);
assertThat(catalog.warehouse()).isEqualTo(warehouse);
Expand Down
Loading
Loading