Skip to content

Commit

Permalink
[core] Support configuring lock in paimon catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
FangYongs committed Mar 1, 2024
1 parent 9723618 commit c0f0446
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public class CatalogOptions {
.defaultValue(false)
.withDescription("Enable Catalog Lock.");

public static final ConfigOption<String> LOCK_TYPE =
ConfigOptions.key("lock.type")
.stringType()
.noDefaultValue()
.withDescription("The Lock Type for Catalog, such as 'hive', 'zookeeper'.");

public static final ConfigOption<Duration> LOCK_CHECK_MAX_SLEEP =
key("lock-check-max-sleep")
.durationType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;

Expand Down Expand Up @@ -80,6 +83,30 @@ protected AbstractCatalog(FileIO fileIO, Options options) {
this.tableDefaultOptions =
convertToPropertiesPrefixKey(options.toMap(), TABLE_DEFAULT_OPTION_PREFIX);
this.catalogOptions = options;

if (lockEnabled()) {
checkArgument(options.contains(LOCK_TYPE), "No lock type when lock is enabled.");
}
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return lockEnabled()
? Optional.of(
FactoryUtil.discoverFactory(
AbstractCatalog.class.getClassLoader(),
CatalogLock.LockFactory.class,
catalogOptions.get(LOCK_TYPE)))
: Optional.empty();
}

@Override
public Optional<CatalogLock.LockContext> lockContext() {
return Optional.of(new OptionLockContext(catalogOptions));
}

protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

@Override
Expand Down Expand Up @@ -465,4 +492,12 @@ private void validateAutoCreateClose(Map<String, String> options) {
"The value of %s property should be %s.",
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

static class OptionLockContext implements CatalogLock.LockContext {
private final Options catalogOptions;

public OptionLockContext(Options catalogOptions) {
this.catalogOptions = catalogOptions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand All @@ -34,7 +35,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;

import static org.apache.paimon.catalog.FileSystemCatalogOptions.CASE_SENSITIVE;
Expand All @@ -56,11 +56,6 @@ public FileSystemCatalog(FileIO fileIO, Path warehouse, Options options) {
this.warehouse = warehouse;
}

@Override
public Optional<CatalogLock.LockFactory> lockFactory() {
return Optional.empty();
}

@Override
public List<String> listDatabases() {
List<String> databases = new ArrayList<>();
Expand Down Expand Up @@ -128,8 +123,7 @@ private boolean tableExists(Path tablePath) {

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
Path path = getDataTableLocation(identifier);
return new SchemaManager(fileIO, path)
return schemaManager(identifier)
.latest()
.orElseThrow(() -> new TableNotExistException(identifier));
}
Expand All @@ -142,8 +136,24 @@ protected void dropTableImpl(Identifier identifier) {

@Override
public void createTableImpl(Identifier identifier, Schema schema) {
uncheck(() -> schemaManager(identifier).createTable(schema));
}

private SchemaManager schemaManager(Identifier identifier) {
Path path = getDataTableLocation(identifier);
uncheck(() -> new SchemaManager(fileIO, path).createTable(schema));
CatalogLock catalogLock =
lockFactory()
.map(
fac ->
fac.create(
lockContext()
.orElseThrow(
() ->
new RuntimeException(
"No lock context when lock is enabled."))))
.orElse(null);
return new SchemaManager(fileIO, path)
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}

@Override
Expand All @@ -156,7 +166,7 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
new SchemaManager(fileIO, getDataTableLocation(identifier)).commitChanges(changes);
schemaManager(identifier).commitChanges(changes);
}

private static <T> T uncheck(Callable<T> callable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
Expand All @@ -35,15 +36,19 @@
import org.junit.jupiter.api.Test;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** ITCase for {@link FlinkCatalog}. */
public class FileSystemCatalogITCase extends AbstractTestBase {
private static final AtomicInteger LOCK_COUNT = new AtomicInteger(0);

private static final String DB_NAME = "default";

Expand Down Expand Up @@ -113,7 +118,7 @@ public void testCatalogOptionsInheritAndOverride() throws Exception {
+ "'table-default.opt2'='value2', "
+ "'table-default.opt3'='value3', "
+ "'fs.allow-hadoop-fallback'='false',"
+ "'lock.enabled'='true'"
+ "'lock.enabled'='false'"
+ ")",
path));
tEnv.useCatalog("fs_with_options");
Expand Down Expand Up @@ -146,6 +151,39 @@ public void testCatalogOptionsInheritAndOverride() throws Exception {
assertThat(tableOptions).doesNotContainKey("lock.enabled");
}

@Test
void testCatalogWithLockForSchema() throws Exception {
LOCK_COUNT.set(0);
assertThatThrownBy(
() ->
tEnv.executeSql(
String.format(
"CREATE CATALOG fs_with_lock WITH ("
+ "'type'='paimon', "
+ "'warehouse'='%s', "
+ "'lock.enabled'='true'"
+ ")",
path))
.await())
.hasRootCauseMessage("No lock type when lock is enabled.");
tEnv.executeSql(
String.format(
"CREATE CATALOG fs_with_lock WITH ("
+ "'type'='paimon', "
+ "'warehouse'='%s', "
+ "'lock.enabled'='true',"
+ "'lock.type'='DUMMY'"
+ ")",
path))
.await();
tEnv.useCatalog("fs_with_lock");
tEnv.executeSql("CREATE TABLE table1 (a STRING, b STRING, c STRING)").await();
tEnv.executeSql("CREATE TABLE table2 (a STRING, b STRING, c STRING)").await();
tEnv.executeSql("CREATE TABLE table3 (a STRING, b STRING, c STRING)").await();
tEnv.executeSql("DROP TABLE table3").await();
assertThat(LOCK_COUNT.get()).isEqualTo(3);
}

private void innerTestWriteRead() throws Exception {
tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', '6')").await();
BlockingIterator<Row, Row> iterator =
Expand All @@ -163,4 +201,29 @@ private List<Row> collect(String sql) throws Exception {
}
return result;
}

/** Lock factory for file system catalog. */
public static class FileSystemCatalogDummyLockFactory implements CatalogLock.LockFactory {
private static final String IDENTIFIER = "DUMMY";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public CatalogLock create(CatalogLock.LockContext context) {
return new CatalogLock() {
@Override
public <T> T runWithLock(String database, String table, Callable<T> callable)
throws Exception {
LOCK_COUNT.incrementAndGet();
return callable.call();
}

@Override
public void close() throws IOException {}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@
org.apache.paimon.flink.FlinkCatalogTest$TestingLogSoreRegisterFactory

# Lineage meta factory
org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory
org.apache.paimon.flink.FlinkLineageITCase$TestingMemoryLineageMetaFactory

# Catalog lock factory
org.apache.paimon.flink.FileSystemCatalogITCase$FileSystemCatalogDummyLockFactory
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@
import java.util.stream.Collectors;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
import static org.apache.paimon.hive.HiveCatalogLock.LOCK_IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -144,6 +145,18 @@ public HiveCatalog(
locationHelper = new StorageLocationHelper();
}

/** Hive catalog only support hive lock. */
if (lockEnabled()) {
Optional<String> lockType = catalogOptions.getOptional(LOCK_TYPE);
if (lockType.isPresent()) {
checkArgument(
LOCK_IDENTIFIER.equals(lockType.get()),
"Hive catalog only support hive lock type");
} else {
catalogOptions.set(LOCK_TYPE, LOCK_IDENTIFIER);
}
}

this.client = createClient(hiveConf, clientClassName);
}

Expand All @@ -159,11 +172,6 @@ public Optional<CatalogLock.LockContext> lockContext() {
new SerializableHiveConf(hiveConf), clientClassName));
}

private boolean lockEnabled() {
return Boolean.parseBoolean(
hiveConf.get(LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString()));
}

@Override
public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) {
try {
Expand Down Expand Up @@ -677,7 +685,8 @@ public static boolean isEmbeddedMetastore(HiveConf hiveConf) {

public static Catalog createHiveCatalog(CatalogContext context) {
HiveConf hiveConf = createHiveConf(context);
String warehouseStr = context.options().get(CatalogOptions.WAREHOUSE);
Options options = context.options();
String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
if (warehouseStr == null) {
warehouseStr =
hiveConf.get(METASTOREWAREHOUSE.varname, METASTOREWAREHOUSE.defaultStrVal);
Expand All @@ -697,8 +706,8 @@ public static Catalog createHiveCatalog(CatalogContext context) {
return new HiveCatalog(
fileIO,
hiveConf,
context.options().get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
context.options(),
options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
options,
warehouse.toUri().toString());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
/** Hive {@link CatalogLock}. */
public class HiveCatalogLock implements CatalogLock {

static final String LOCK_IDENTIFIER = "hive";

private final IMetaStoreClient client;
private final long checkMaxSleep;
private final long acquireTimeout;
Expand Down Expand Up @@ -121,8 +123,6 @@ private static class HiveCatalogLockFactory implements LockFactory {

private static final long serialVersionUID = 1L;

private static final String IDENTIFIER = "hive";

@Override
public CatalogLock create(LockContext context) {
checkArgument(context instanceof HiveLockContext);
Expand All @@ -136,7 +136,7 @@ public CatalogLock create(LockContext context) {

@Override
public String identifier() {
return IDENTIFIER;
return LOCK_IDENTIFIER;
}
}

Expand Down

0 comments on commit c0f0446

Please sign in to comment.