Skip to content

Commit

Permalink
[catalog] Refactor Catalog Factory and revert jdbc lock in FileSystem…
Browse files Browse the repository at this point in the history
…Catalog (apache#3099)
  • Loading branch information
JingsongLi authored and zhu3pang committed Mar 29, 2024
1 parent 78b1ace commit 4c17a3a
Show file tree
Hide file tree
Showing 23 changed files with 331 additions and 355 deletions.
10 changes: 0 additions & 10 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,6 @@ USE CATALOG my_catalog;

You can define any default table options with the prefix `table-default.` for tables created in the catalog.

The FileSystem catalog supports jdbc lock and can take effect through the following configuration:

> ```shell
> 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>'
> 'jdbc.user' = '...',
> 'jdbc.password' = '...',
> ```
{{< /tab >}}

{{< tab "Spark3" >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,26 +83,31 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;
}

if (lockEnabled()) {
checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled.");
@Override
public Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
}

String lock = catalogOptions.get(LOCK_TYPE);
if (lock == null) {
return defaultLockFactory();
}

return Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock));
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLock.LockFactory.class,
catalogOptions.get(LOCK_TYPE)))
: Optional.empty();
public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.empty();
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new OptionLockContext(catalogOptions));
public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
}

protected boolean lockEnabled() {
Expand Down Expand Up @@ -498,12 +503,4 @@ private void validateAutoCreateClose(Map<String, String> options) {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

static class OptionLockContext implements CatalogLock.LockContext {
private final Options catalogOptions;

public OptionLockContext(Options catalogOptions) {
this.catalogOptions = catalogOptions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public interface Catalog extends AutoCloseable {
* Get lock factory from catalog. Lock is used to support multiple concurrent writes on the
* object store.
*/
Optional<CatalogLock.LockFactory> lockFactory();
Optional<CatalogLockFactory> lockFactory();

/** Get lock context for lock factory to create a lock. */
default Optional<CatalogLock.LockContext> lockContext() {
default Optional<CatalogLockContext> lockContext() {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.paimon.catalog;

import org.apache.paimon.annotation.Public;
import org.apache.paimon.factories.Factory;

import java.io.Closeable;
import java.io.Serializable;
import java.util.concurrent.Callable;

/**
Expand All @@ -35,12 +33,4 @@ public interface CatalogLock extends Closeable {

/** Run with catalog lock. The caller should tell catalog the database and table name. */
<T> T runWithLock(String database, String table, Callable<T> callable) throws Exception;

/** Factory to create {@link CatalogLock}. */
interface LockFactory extends Factory, Serializable {
CatalogLock create(LockContext context);
}

/** Context for lock factory to create lock. */
interface LockContext extends Serializable {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.options.Options;

import java.io.Serializable;

/** Context for lock factory to create lock. */
public interface CatalogLockContext extends Serializable {

Options options();

static CatalogLockContext fromOptions(Options options) {
return () -> options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.catalog;

import org.apache.paimon.factories.Factory;

import java.io.Serializable;

/** Factory to create {@link CatalogLock}. */
public interface CatalogLockFactory extends Factory, Serializable {

CatalogLock createLock(CatalogLockContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.catalog;

import org.apache.paimon.client.ClientPool;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand All @@ -36,7 +35,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand All @@ -48,8 +46,6 @@ public class FileSystemCatalog extends AbstractCatalog {

private final Path warehouse;

private ClientPool.ClientPoolImpl clientPool;

public FileSystemCatalog(FileIO fileIO, Path warehouse) {
super(fileIO);
this.warehouse = warehouse;
Expand Down Expand Up @@ -161,7 +157,7 @@ private SchemaManager schemaManager(Identifier identifier) {
lockFactory()
.map(
fac ->
fac.create(
fac.createLock(
lockContext()
.orElseThrow(
() ->
Expand All @@ -172,14 +168,6 @@ private SchemaManager schemaManager(Identifier identifier) {
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
if (clientPool == null) {
this.clientPool = LockContextUtils.tryInitializeClientPool(catalogOptions);
}
return LockContextUtils.lockContext(this.clientPool, catalogOptions, "filesystem");
}

@Override
public void renameTableImpl(Identifier fromTable, Identifier toTable) {
Path fromPath = getDataTableLocation(fromTable);
Expand Down Expand Up @@ -211,9 +199,7 @@ private static String database(Path path) {
}

@Override
public void close() throws Exception {
LockContextUtils.close(clientPool);
}
public void close() throws Exception {}

@Override
public String warehouse() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -344,8 +345,13 @@ public boolean caseSensitive() {
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new JdbcCatalogLock.JdbcLockContext(connections, catalogKey, options));
public Optional<CatalogLockFactory> defaultLockFactory() {
return Optional.of(new JdbcCatalogLockFactory());
}

@Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(new JdbcCatalogLockContext(connections, catalogKey, options));
}

private Lock lock(Identifier identifier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;

import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;

/** Factory to create {@link JdbcCatalog}. */
public class JdbcCatalogFactory implements CatalogFactory {

Expand All @@ -42,11 +39,6 @@ public String identifier() {
public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) {
Options options = context.options();
String catalogKey = options.get(JdbcCatalogOptions.CATALOG_KEY);
if (options.get(LOCK_ENABLED)) {
if (!options.getOptional(LOCK_TYPE).isPresent()) {
options.set(LOCK_TYPE, JdbcCatalogLock.JdbcCatalogLockFactory.IDENTIFIER);
}
}
return new JdbcCatalog(fileIO, catalogKey, context.options(), warehouse.toString());
}
}
Loading

0 comments on commit 4c17a3a

Please sign in to comment.