diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index a3de5df31e56..43d23dc1f51a 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -30,6 +30,7 @@ Paimon catalogs currently support two types of metastores: * `filesystem` metastore (default), which stores both metadata and table files in filesystems. * `hive` metastore, which additionally stores metadata in Hive metastore. Users can directly access the tables from Hive. +* `jdbc` metastore, which additionally stores metadata in relational databases such as MySQL, Postgres, etc. See [CatalogOptions]({{< ref "maintenance/configurations#catalogoptions" >}}) for detailed options when creating a catalog. @@ -175,3 +176,42 @@ Using the table option facilitates the convenient definition of Hive table param Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. +## Creating a Catalog with JDBC Metastore + +By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as SQLite, MySQL, postgres, etc. + +Currently, lock configuration is only supported for MySQL and SQLite. If you are using a different type of database for catalog storage, please do not configure `lock.enabled`. + +{{< tabs "jdbc-metastore-example" >}} + +{{< tab "Flink" >}} + +Paimon JDBC Catalog in Flink needs to correctly add the corresponding jar package for connecting to the database. You should first download JDBC connector bundled jar and add it to classpath. such as MySQL, postgres + +| database type | Bundle Name | SQL Client JAR | +|:--------------|:---------------------|:---------------------------------------------------------------------------| +| mysql | mysql-connector-java | [Download](https://mvnrepository.com/artifact/mysql/mysql-connector-java) | +| postgres | postgresql | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) | + +```sql +CREATE CATALOG my_jdbc WITH ( + 'type' = 'paimon', + 'metastore' = 'jdbc', + 'uri' = 'jdbc:mysql://:/', + 'jdbc.user' = '...', + 'jdbc.password' = '...', + 'catalog-key'='jdbc', + 'warehouse' = 'hdfs:///path/to/warehouse' +); + +USE CATALOG my_jdbc; +``` +You can configure any connection parameters that have been declared by JDBC through "jdbc.", the connection parameters may be different between different databases, please configure according to the actual situation. + +You can also perform logical isolation for databases under multiple catalogs by specifying "catalog-key". + +You can define any default table options with the prefix `table-default.` for tables created in the catalog. + +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/content/maintenance/configurations.md b/docs/content/maintenance/configurations.md index f48956462b6a..87d59cd34eff 100644 --- a/docs/content/maintenance/configurations.md +++ b/docs/content/maintenance/configurations.md @@ -50,6 +50,12 @@ Options for Hive catalog. {{< generated/hive_catalog_configuration >}} +### JdbcCatalogOptions + +Options for Jdbc catalog. + +{{< generated/jdbc_catalog_configuration >}} + ### FlinkCatalogOptions Flink catalog options for paimon. diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index d243737a8353..e685559447e2 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -26,6 +26,12 @@ + +
client-pool-size
+ 2 + Integer + Configure the size of the connection pool. +
fs.allow-hadoop-fallback
true @@ -60,7 +66,7 @@
metastore
"filesystem" String - Metastore of paimon catalog, supports filesystem and hive. + Metastore of paimon catalog, supports filesystem、hive and jdbc.
table.type
diff --git a/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html new file mode 100644 index 000000000000..617018ce9f5b --- /dev/null +++ b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html @@ -0,0 +1,36 @@ +{{/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/}} + + + + + + + + + + + + + + + + + +
KeyDefaultTypeDescription
catalog-key
(none)StringCustom jdbc catalog store key.
diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java new file mode 100644 index 000000000000..f0e4f07417e7 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPool.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.client; + +/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */ +public interface ClientPool { + /** Action interface for client. */ + interface Action { + R run(C client) throws E; + } + + R run(Action action) throws E, InterruptedException; + + R run(Action action, boolean retry) throws E, InterruptedException; +} diff --git a/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java b/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java new file mode 100644 index 000000000000..dda336be2183 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/client/ClientPoolImpl.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.Deque; + +import static org.apache.paimon.utils.Preconditions.checkState; + +/** Source: [core/src/main/java/org/apache/iceberg/ClientPoolImpl.java]. */ +public abstract class ClientPoolImpl + implements Closeable, ClientPool { + private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class); + + private final int poolSize; + private final Deque clients; + private final Class reconnectExc; + private final Object signal = new Object(); + private final boolean retryByDefault; + private volatile int currentSize; + private boolean closed; + + public ClientPoolImpl(int poolSize, Class reconnectExc, boolean retryByDefault) { + this.poolSize = poolSize; + this.reconnectExc = reconnectExc; + this.clients = new ArrayDeque<>(poolSize); + this.currentSize = 0; + this.closed = false; + this.retryByDefault = retryByDefault; + } + + @Override + public R run(Action action) throws E, InterruptedException { + return run(action, retryByDefault); + } + + @Override + public R run(Action action, boolean retry) throws E, InterruptedException { + C client = get(); + try { + return action.run(client); + } catch (Exception exc) { + if (retry && isConnectionException(exc)) { + try { + client = reconnect(client); + } catch (Exception ignored) { + // if reconnection throws any exception, rethrow the original failure + throw reconnectExc.cast(exc); + } + + return action.run(client); + } + + throw exc; + + } finally { + release(client); + } + } + + protected abstract C newClient(); + + protected abstract C reconnect(C client); + + protected boolean isConnectionException(Exception exc) { + return reconnectExc.isInstance(exc); + } + + protected abstract void close(C client); + + @Override + public void close() { + this.closed = true; + try { + while (currentSize > 0) { + if (!clients.isEmpty()) { + synchronized (this) { + if (!clients.isEmpty()) { + C client = clients.removeFirst(); + close(client); + currentSize -= 1; + } + } + } + if (clients.isEmpty() && currentSize > 0) { + synchronized (signal) { + // wake every second in case this missed the signal + signal.wait(1000); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e); + } + } + + private C get() throws InterruptedException { + checkState(!closed, "Cannot get a client from a closed pool"); + while (true) { + if (!clients.isEmpty() || currentSize < poolSize) { + synchronized (this) { + if (!clients.isEmpty()) { + return clients.removeFirst(); + } else if (currentSize < poolSize) { + C client = newClient(); + currentSize += 1; + return client; + } + } + } + synchronized (signal) { + // wake every second in case this missed the signal + signal.wait(1000); + } + } + } + + private void release(C client) { + synchronized (this) { + clients.addFirst(client); + } + synchronized (signal) { + signal.notify(); + } + } + + public int poolSize() { + return poolSize; + } + + public boolean isClosed() { + return closed; + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 0fba499dd13b..42cd9e418844 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -39,7 +39,8 @@ public class CatalogOptions { ConfigOptions.key("metastore") .stringType() .defaultValue("filesystem") - .withDescription("Metastore of paimon catalog, supports filesystem and hive."); + .withDescription( + "Metastore of paimon catalog, supports filesystem、hive and jdbc."); public static final ConfigOption URI = ConfigOptions.key("uri") @@ -78,6 +79,12 @@ public class CatalogOptions { .withDescription( "Allow to fallback to hadoop File IO when no file io found for the scheme."); + public static final ConfigOption CLIENT_POOL_SIZE = + key("client-pool-size") + .intType() + .defaultValue(2) + .withDescription("Configure the size of the connection pool."); + public static final ConfigOption LINEAGE_META = key("lineage-meta") .stringType() diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index 0a0effb15fea..f4e028bdcb19 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -183,6 +183,13 @@ under the License. 3.6.1 + + org.xerial + sqlite-jdbc + 3.44.0.0 + test + + diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java new file mode 100644 index 000000000000..f3469d0b5ba5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** Jdbc distributed lock interface. */ +public abstract class AbstractDistributedLockDialect implements JdbcDistributedLockDialect { + + @Override + public void createTable(JdbcClientPool connections) throws SQLException, InterruptedException { + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables( + null, null, JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(getCreateTableSql()).execute(); + }); + } + + public abstract String getCreateTableSql(); + + @Override + public boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds) + throws SQLException, InterruptedException { + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(getLockAcquireSql())) { + preparedStatement.setString(1, lockId); + preparedStatement.setLong(2, timeoutMillSeconds / 1000); + return preparedStatement.executeUpdate() > 0; + } catch (SQLException ex) { + return false; + } + }); + } + + public abstract String getLockAcquireSql(); + + @Override + public boolean releaseLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException { + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(getReleaseLockSql())) { + preparedStatement.setString(1, lockId); + return preparedStatement.executeUpdate() > 0; + } + }); + } + + public abstract String getReleaseLockSql(); + + @Override + public int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException { + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(getTryReleaseTimedOutLock())) { + preparedStatement.setString(1, lockId); + return preparedStatement.executeUpdate(); + } + }); + } + + public abstract String getTryReleaseTimedOutLock(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java new file mode 100644 index 000000000000..1978456386c0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +class DistributedLockDialectFactory { + static JdbcDistributedLockDialect create(String protocol) { + JdbcProtocol type = JdbcProtocol.valueOf(protocol.toUpperCase()); + switch (type) { + case SQLITE: + return new SqlLiteDistributedLockDialect(); + case MYSQL: + return new MysqlDistributedLockDialect(); + default: + throw new UnsupportedOperationException( + String.format("Distributed locks based on %s are not supported", protocol)); + } + } + + /** Supported jdbc protocol. */ + enum JdbcProtocol { + SQLITE, + // for mysql. + MARIADB, + MYSQL; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java new file mode 100644 index 000000000000..5dc2abc9e416 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.Lock; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; +import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; +import static org.apache.paimon.jdbc.JdbcUtils.execute; +import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; +import static org.apache.paimon.jdbc.JdbcUtils.updateTable; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; + +/** Support jdbc catalog. */ +public class JdbcCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class); + public static final String PROPERTY_PREFIX = "jdbc."; + private static final String DATABASE_EXISTS_PROPERTY = "exists"; + private JdbcClientPool connections; + private String catalogKey = "jdbc"; + private Map configuration; + private final String warehouse; + + protected JdbcCatalog( + FileIO fileIO, String catalogKey, Map config, String warehouse) { + super(fileIO); + if (!StringUtils.isBlank(catalogKey)) { + this.catalogKey = catalogKey; + } + this.configuration = config; + this.warehouse = warehouse; + Preconditions.checkNotNull(configuration, "Invalid catalog properties: null"); + this.connections = + new JdbcClientPool( + Integer.parseInt( + config.getOrDefault( + CatalogOptions.CLIENT_POOL_SIZE.key(), + CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())), + configuration.get(CatalogOptions.URI.key()), + configuration); + try { + initializeCatalogTablesIfNeed(); + } catch (SQLException e) { + throw new RuntimeException("Cannot initialize JDBC catalog", e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted in call to initialize", e); + } + } + + @VisibleForTesting + public JdbcClientPool getConnections() { + return connections; + } + + /** Initialize catalog tables. */ + private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException { + String uri = configuration.get(CatalogOptions.URI.key()); + Preconditions.checkNotNull(uri, "JDBC connection URI is required"); + // Check and create catalog table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute(); + }); + + // Check and create database properties table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables( + null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE) + .execute(); + }); + + // if lock enabled, Check and create distributed lock table. + if (lockEnabled()) { + JdbcUtils.createDistributedLockTable(connections); + } + } + + @Override + public String warehouse() { + return warehouse; + } + + @Override + public List listDatabases() { + List databases = Lists.newArrayList(); + databases.addAll( + fetch( + row -> row.getString(JdbcUtils.TABLE_DATABASE), + JdbcUtils.LIST_ALL_TABLE_DATABASES_SQL, + catalogKey)); + + databases.addAll( + fetch( + row -> row.getString(JdbcUtils.DATABASE_NAME), + JdbcUtils.LIST_ALL_PROPERTY_DATABASES_SQL, + catalogKey)); + return databases; + } + + @Override + protected boolean databaseExistsImpl(String databaseName) { + return JdbcUtils.databaseExists(connections, catalogKey, databaseName); + } + + @Override + protected Map loadDatabasePropertiesImpl(String databaseName) { + if (!databaseExists(databaseName)) { + throw new RuntimeException(String.format("Database does not exist: %s", databaseName)); + } + Map properties = Maps.newHashMap(); + properties.putAll(fetchProperties(databaseName)); + if (!properties.containsKey(DB_LOCATION_PROP)) { + properties.put(DB_LOCATION_PROP, newDatabasePath(databaseName).getName()); + } + properties.remove(DATABASE_EXISTS_PROPERTY); + return ImmutableMap.copyOf(properties); + } + + @Override + protected void createDatabaseImpl(String name, Map properties) { + if (databaseExists(name)) { + throw new RuntimeException(String.format("Database already exists: %s", name)); + } + + Map createProps = new HashMap<>(); + createProps.put(DATABASE_EXISTS_PROPERTY, "true"); + if (properties != null && !properties.isEmpty()) { + createProps.putAll(properties); + } + + if (!createProps.containsKey(DB_LOCATION_PROP)) { + Path databasePath = newDatabasePath(name); + createProps.put(DB_LOCATION_PROP, databasePath.toString()); + } + insertProperties(connections, catalogKey, name, createProps); + } + + @Override + protected void dropDatabaseImpl(String name) { + // Delete table from paimon_tables + execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogKey, name); + // Delete properties from paimon_database_properties + execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, catalogKey, name); + } + + @Override + protected List listTablesImpl(String databaseName) { + if (!databaseExists(databaseName)) { + throw new RuntimeException(String.format("Database does not exist: %s", databaseName)); + } + return fetch( + row -> row.getString(JdbcUtils.TABLE_NAME), + JdbcUtils.LIST_TABLES_SQL, + catalogKey, + databaseName); + } + + @Override + protected void dropTableImpl(Identifier identifier) { + try { + int deletedRecords = + execute( + connections, + JdbcUtils.DROP_TABLE_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + + if (deletedRecords == 0) { + LOG.info("Skipping drop, table does not exist: {}", identifier); + return; + } + Path path = getDataTableLocation(identifier); + try { + if (fileIO.exists(path)) { + fileIO.deleteDirectoryQuietly(path); + } + } catch (Exception ex) { + LOG.error("Delete directory[{}] fail for table {}", path, identifier, ex); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e); + } + } + + @Override + protected void createTableImpl(Identifier identifier, Schema schema) { + try { + // create table file + getSchemaManager(identifier).createTable(schema); + // Update schema metadata + Path path = getDataTableLocation(identifier); + int insertRecord = + connections.run( + conn -> { + try (PreparedStatement sql = + conn.prepareStatement( + JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) { + sql.setString(1, catalogKey); + sql.setString(2, identifier.getDatabaseName()); + sql.setString(3, identifier.getObjectName()); + return sql.executeUpdate(); + } + }); + if (insertRecord == 1) { + LOG.debug("Successfully committed to new table: {}", identifier); + } else { + try { + fileIO.deleteDirectoryQuietly(path); + } catch (Exception ee) { + LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); + } + throw new RuntimeException( + String.format( + "Failed to create table %s in catalog %s", + identifier.getFullName(), catalogKey)); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); + } + } + + @Override + protected void renameTableImpl(Identifier fromTable, Identifier toTable) { + try { + // update table metadata info + updateTable(connections, catalogKey, fromTable, toTable); + + Path fromPath = getDataTableLocation(fromTable); + if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { + // Rename the file system's table directory. Maintain consistency between tables in + // the file system and tables in the Hive Metastore. + Path toPath = getDataTableLocation(toTable); + try { + fileIO.rename(fromPath, toPath); + } catch (IOException e) { + throw new RuntimeException( + "Failed to rename changes of table " + + toTable.getFullName() + + " to underlying files.", + e); + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); + } + } + + @Override + protected void alterTableImpl(Identifier identifier, List changes) + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + if (!tableExists(identifier)) { + throw new RuntimeException( + String.format("Table is not exists {}", identifier.getFullName())); + } + SchemaManager schemaManager = getSchemaManager(identifier); + schemaManager.commitChanges(changes); + } + + @Override + protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + if (!tableExists(identifier)) { + throw new TableNotExistException(identifier); + } + Path tableLocation = getDataTableLocation(identifier); + return new SchemaManager(fileIO, tableLocation) + .latest() + .orElseThrow( + () -> new RuntimeException("There is no paimon table in " + tableLocation)); + } + + @Override + public boolean tableExists(Identifier identifier) { + if (isSystemTable(identifier)) { + return super.tableExists(identifier); + } + return JdbcUtils.tableExists( + connections, catalogKey, identifier.getDatabaseName(), identifier.getObjectName()); + } + + @Override + public boolean caseSensitive() { + return false; + } + + @Override + public Optional lockFactory() { + return lockEnabled() + ? Optional.of(JdbcCatalogLock.createFactory(connections, catalogKey, configuration)) + : Optional.empty(); + } + + private boolean lockEnabled() { + return Boolean.parseBoolean( + configuration.getOrDefault( + LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString())); + } + + private Lock lock(Identifier identifier) { + if (!lockEnabled()) { + return new Lock.EmptyLock(); + } + JdbcCatalogLock lock = + new JdbcCatalogLock( + connections, + catalogKey, + checkMaxSleep(configuration), + acquireTimeout(configuration)); + return Lock.fromCatalog(lock, identifier); + } + + @Override + public void close() throws Exception { + if (!connections.isClosed()) { + connections.close(); + } + } + + private SchemaManager getSchemaManager(Identifier identifier) { + return new SchemaManager(fileIO, getDataTableLocation(identifier)) + .withLock(lock(identifier)); + } + + private Map fetchProperties(String databaseName) { + if (!databaseExists(databaseName)) { + throw new RuntimeException(String.format("Database does not exist: %s", databaseName)); + } + List> entries = + fetch( + row -> + new AbstractMap.SimpleImmutableEntry<>( + row.getString(JdbcUtils.DATABASE_PROPERTY_KEY), + row.getString(JdbcUtils.DATABASE_PROPERTY_VALUE)), + JdbcUtils.GET_ALL_DATABASE_PROPERTIES_SQL, + catalogKey, + databaseName); + return ImmutableMap.builder().putAll(entries).build(); + } + + @FunctionalInterface + interface RowProducer { + R apply(ResultSet result) throws SQLException; + } + + @SuppressWarnings("checkstyle:NestedTryDepth") + private List fetch(RowProducer toRow, String sql, String... args) { + try { + return connections.run( + conn -> { + List result = Lists.newArrayList(); + try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + for (int pos = 0; pos < args.length; pos += 1) { + preparedStatement.setString(pos + 1, args[pos]); + } + try (ResultSet rs = preparedStatement.executeQuery()) { + while (rs.next()) { + result.add(toRow.apply(rs)); + } + } + } + return result; + }); + } catch (SQLException e) { + throw new RuntimeException(String.format("Failed to execute query: %s", sql), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java new file mode 100644 index 000000000000..1c791eb5e11e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +/** Factory to create {@link JdbcCatalog}. */ +public class JdbcCatalogFactory implements CatalogFactory { + + public static final String IDENTIFIER = "jdbc"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { + String catalogKey = context.options().get(JdbcCatalogOptions.CATALOG_KEY); + return new JdbcCatalog(fileIO, catalogKey, context.options().toMap(), warehouse.getName()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java new file mode 100644 index 000000000000..94287cb6e0a0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.utils.TimeUtils; + +import java.io.IOException; +import java.sql.SQLException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.Callable; + +import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT; +import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP; + +/** Jdbc catalog lock. */ +public class JdbcCatalogLock implements CatalogLock { + private final JdbcClientPool connections; + private final long checkMaxSleep; + private final long acquireTimeout; + private final String catalogName; + + public JdbcCatalogLock( + JdbcClientPool connections, + String catalogName, + long checkMaxSleep, + long acquireTimeout) { + this.connections = connections; + this.checkMaxSleep = checkMaxSleep; + this.acquireTimeout = acquireTimeout; + this.catalogName = catalogName; + } + + @Override + public T runWithLock(String database, String table, Callable callable) throws Exception { + String lockUniqueName = String.format("%s.%s.%s", catalogName, database, table); + lock(lockUniqueName); + try { + return callable.call(); + } finally { + JdbcUtils.release(connections, lockUniqueName); + } + } + + private void lock(String lockUniqueName) throws SQLException, InterruptedException { + boolean lock = JdbcUtils.acquire(connections, lockUniqueName, acquireTimeout); + long nextSleep = 50; + long startRetry = System.currentTimeMillis(); + while (!lock) { + nextSleep *= 2; + if (nextSleep > checkMaxSleep) { + nextSleep = checkMaxSleep; + } + Thread.sleep(nextSleep); + lock = JdbcUtils.acquire(connections, lockUniqueName, acquireTimeout); + if (System.currentTimeMillis() - startRetry > acquireTimeout) { + break; + } + } + long retryDuration = System.currentTimeMillis() - startRetry; + if (!lock) { + throw new RuntimeException( + "Acquire lock failed with time: " + Duration.ofMillis(retryDuration)); + } + } + + @Override + public void close() throws IOException { + // Do nothing + } + + /** Create a jdbc lock factory. */ + public static LockFactory createFactory( + JdbcClientPool connections, String catalogName, Map conf) { + return new JdbcCatalogLockFactory(connections, catalogName, conf); + } + + private static class JdbcCatalogLockFactory implements LockFactory { + + private static final long serialVersionUID = 1L; + private static final String IDENTIFIER = "jdbc"; + private final JdbcClientPool connections; + private final String catalogName; + private final Map conf; + + public JdbcCatalogLockFactory( + JdbcClientPool connections, String catalogName, Map conf) { + this.connections = connections; + this.catalogName = catalogName; + this.conf = conf; + } + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public CatalogLock create(LockContext context) { + return new JdbcCatalogLock( + connections, catalogName, checkMaxSleep(conf), acquireTimeout(conf)); + } + } + + public static long checkMaxSleep(Map conf) { + return TimeUtils.parseDuration( + conf.getOrDefault( + LOCK_CHECK_MAX_SLEEP.key(), + TimeUtils.getStringInMillis(LOCK_CHECK_MAX_SLEEP.defaultValue()))) + .toMillis(); + } + + public static long acquireTimeout(Map conf) { + return TimeUtils.parseDuration( + conf.getOrDefault( + LOCK_ACQUIRE_TIMEOUT.key(), + TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue()))) + .toMillis(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java new file mode 100644 index 000000000000..dd4afd473e22 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; + +/** Options for jdbc catalog. */ +public final class JdbcCatalogOptions { + + public static final ConfigOption CATALOG_KEY = + ConfigOptions.key("catalog-key") + .stringType() + .defaultValue(null) + .withDescription("Custom jdbc catalog store key."); + + private JdbcCatalogOptions() {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java new file mode 100644 index 000000000000..e1a4cccf1cfb --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.client.ClientPoolImpl; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Client pool for jdbc. */ +public class JdbcClientPool extends ClientPoolImpl { + + private static final Pattern PROTOCOL_PATTERN = Pattern.compile("jdbc:([^:]+):(.*)"); + private final String dbUrl; + private final Map properties; + private final String protocol; + + public JdbcClientPool(int poolSize, String dbUrl, Map props) { + super(poolSize, SQLNonTransientConnectionException.class, true); + properties = props; + this.dbUrl = dbUrl; + Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl); + if (matcher.matches()) { + this.protocol = matcher.group(1); + } else { + throw new RuntimeException("Invalid Jdbc url: " + dbUrl); + } + } + + @Override + protected Connection newClient() { + try { + Properties dbProps = + JdbcUtils.extractJdbcConfiguration(properties, JdbcCatalog.PROPERTY_PREFIX); + return DriverManager.getConnection(dbUrl, dbProps); + } catch (SQLException e) { + throw new RuntimeException(String.format("Failed to connect: %s", dbUrl), e); + } + } + + @Override + protected Connection reconnect(Connection client) { + close(client); + return newClient(); + } + + public String getProtocol() { + return protocol; + } + + @Override + protected void close(Connection client) { + try { + client.close(); + } catch (SQLException e) { + throw new RuntimeException("Failed to close connection", e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java new file mode 100644 index 000000000000..a691aac2295f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import java.sql.SQLException; + +/** Jdbc distributed lock interface. */ +public interface JdbcDistributedLockDialect { + void createTable(JdbcClientPool connections) throws SQLException, InterruptedException; + + boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds) + throws SQLException, InterruptedException; + + boolean releaseLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException; + + int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java new file mode 100644 index 000000000000..7b9b93a5a4e2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.Identifier; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** Util for jdbc catalog. */ +public class JdbcUtils { + private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + public static final String CATALOG_TABLE_NAME = "paimon_tables"; + public static final String CATALOG_KEY = "catalog_key"; + public static final String TABLE_DATABASE = "database_name"; + public static final String TABLE_NAME = "table_name"; + + static final String CREATE_CATALOG_TABLE = + "CREATE TABLE " + + CATALOG_TABLE_NAME + + "(" + + CATALOG_KEY + + " VARCHAR(255) NOT NULL," + + TABLE_DATABASE + + " VARCHAR(255) NOT NULL," + + TABLE_NAME + + " VARCHAR(255) NOT NULL," + + " PRIMARY KEY (" + + CATALOG_KEY + + ", " + + TABLE_DATABASE + + ", " + + TABLE_NAME + + ")" + + ")"; + static final String GET_TABLE_SQL = + "SELECT * FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? "; + static final String LIST_TABLES_SQL = + "SELECT * FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ?"; + + static final String DELETE_TABLES_SQL = + "DELETE FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ?"; + static final String RENAME_TABLE_SQL = + "UPDATE " + + CATALOG_TABLE_NAME + + " SET " + + TABLE_DATABASE + + " = ? , " + + TABLE_NAME + + " = ? " + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? "; + static final String DROP_TABLE_SQL = + "DELETE FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? "; + static final String GET_DATABASE_SQL = + "SELECT " + + TABLE_DATABASE + + " FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? LIMIT 1"; + + static final String LIST_ALL_TABLE_DATABASES_SQL = + "SELECT DISTINCT " + + TABLE_DATABASE + + " FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ?"; + static final String DO_COMMIT_CREATE_TABLE_SQL = + "INSERT INTO " + + CATALOG_TABLE_NAME + + " (" + + CATALOG_KEY + + ", " + + TABLE_DATABASE + + ", " + + TABLE_NAME + + ") " + + " VALUES (?,?,?)"; + + // Catalog database Properties + static final String DATABASE_PROPERTIES_TABLE_NAME = "paimon_database_properties"; + static final String DATABASE_NAME = "database_name"; + static final String DATABASE_PROPERTY_KEY = "property_key"; + static final String DATABASE_PROPERTY_VALUE = "property_value"; + + static final String CREATE_DATABASE_PROPERTIES_TABLE = + "CREATE TABLE " + + DATABASE_PROPERTIES_TABLE_NAME + + "(" + + CATALOG_KEY + + " VARCHAR(255) NOT NULL," + + DATABASE_NAME + + " VARCHAR(255) NOT NULL," + + DATABASE_PROPERTY_KEY + + " VARCHAR(255)," + + DATABASE_PROPERTY_VALUE + + " VARCHAR(1000)," + + "PRIMARY KEY (" + + CATALOG_KEY + + ", " + + DATABASE_NAME + + ", " + + DATABASE_PROPERTY_KEY + + ")" + + ")"; + static final String GET_DATABASE_PROPERTIES_SQL = + "SELECT " + + DATABASE_NAME + + " FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + DATABASE_NAME + + " = ? "; + static final String INSERT_DATABASE_PROPERTIES_SQL = + "INSERT INTO " + + DATABASE_PROPERTIES_TABLE_NAME + + " (" + + CATALOG_KEY + + ", " + + DATABASE_NAME + + ", " + + DATABASE_PROPERTY_KEY + + ", " + + DATABASE_PROPERTY_VALUE + + ") VALUES "; + static final String INSERT_PROPERTIES_VALUES_BASE = "(?,?,?,?)"; + static final String GET_ALL_DATABASE_PROPERTIES_SQL = + "SELECT * " + + " FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + DATABASE_NAME + + " = ? "; + static final String DELETE_ALL_DATABASE_PROPERTIES_SQL = + "DELETE FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + DATABASE_NAME + + " = ?"; + static final String LIST_ALL_PROPERTY_DATABASES_SQL = + "SELECT DISTINCT " + + DATABASE_NAME + + " FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ?"; + + // Distributed locks table + static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks"; + static final String LOCK_ID = "lock_id"; + static final String ACQUIRED_AT = "acquired_at"; + static final String EXPIRE_TIME = "expire_time_seconds"; + + public static Properties extractJdbcConfiguration( + Map properties, String prefix) { + Properties result = new Properties(); + properties.forEach( + (key, value) -> { + if (key.startsWith(prefix)) { + result.put(key.substring(prefix.length()), value); + } + }); + return result; + } + + /** Get paimon table metadata. */ + public static Map getTable( + JdbcClientPool connections, String storeKey, String databaseName, String tableName) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + Map table = Maps.newHashMap(); + + try (PreparedStatement sql = conn.prepareStatement(JdbcUtils.GET_TABLE_SQL)) { + sql.setString(1, storeKey); + sql.setString(2, databaseName); + sql.setString(3, tableName); + ResultSet rs = sql.executeQuery(); + if (rs.next()) { + table.put(CATALOG_KEY, rs.getString(CATALOG_KEY)); + table.put(TABLE_DATABASE, rs.getString(TABLE_DATABASE)); + table.put(TABLE_NAME, rs.getString(TABLE_NAME)); + } + rs.close(); + } + return table; + }); + } + + public static void updateTable( + JdbcClientPool connections, String storeKey, Identifier fromTable, Identifier toTable) { + int updatedRecords = + execute( + err -> { + if (err instanceof SQLIntegrityConstraintViolationException + || (err.getMessage() != null + && err.getMessage().contains("constraint failed"))) { + throw new RuntimeException( + String.format("Table already exists: %s", toTable)); + } + }, + connections, + JdbcUtils.RENAME_TABLE_SQL, + toTable.getDatabaseName(), + toTable.getObjectName(), + storeKey, + fromTable.getDatabaseName(), + fromTable.getObjectName()); + + if (updatedRecords == 1) { + LOG.info("Renamed table from {}, to {}", fromTable, toTable); + } else if (updatedRecords == 0) { + throw new RuntimeException(String.format("Table does not exist: %s", fromTable)); + } else { + LOG.warn( + "Rename operation affected {} rows: the catalog table's primary key assumption has been violated", + updatedRecords); + } + } + + public static boolean databaseExists( + JdbcClientPool connections, String storeKey, String databaseName) { + + if (exists(connections, JdbcUtils.GET_DATABASE_SQL, storeKey, databaseName)) { + return true; + } + + if (exists(connections, JdbcUtils.GET_DATABASE_PROPERTIES_SQL, storeKey, databaseName)) { + return true; + } + return false; + } + + public static boolean tableExists( + JdbcClientPool connections, String storeKey, String databaseName, String tableName) { + if (exists(connections, JdbcUtils.GET_TABLE_SQL, storeKey, databaseName, tableName)) { + return true; + } + return false; + } + + @SuppressWarnings("checkstyle:NestedTryDepth") + private static boolean exists(JdbcClientPool connections, String sql, String... args) { + try { + return connections.run( + conn -> { + try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + for (int index = 0; index < args.length; index++) { + preparedStatement.setString(index + 1, args[index]); + } + try (ResultSet rs = preparedStatement.executeQuery()) { + if (rs.next()) { + return true; + } + } + } + return false; + }); + } catch (SQLException e) { + throw new RuntimeException(String.format("Failed to execute exists query: %s", sql), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } + + public static int execute(JdbcClientPool connections, String sql, String... args) { + return execute(err -> {}, connections, sql, args); + } + + public static int execute( + Consumer sqlErrorHandler, + JdbcClientPool connections, + String sql, + String... args) { + try { + return connections.run( + conn -> { + try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + for (int pos = 0; pos < args.length; pos++) { + preparedStatement.setString(pos + 1, args[pos]); + } + return preparedStatement.executeUpdate(); + } + }); + } catch (SQLException e) { + sqlErrorHandler.accept(e); + throw new RuntimeException(String.format("Failed to execute: %s", sql), e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted in SQL command", e); + } + } + + public static boolean insertProperties( + JdbcClientPool connections, + String storeKey, + String databaseName, + Map properties) { + String[] args = + properties.entrySet().stream() + .flatMap( + entry -> + Stream.of( + storeKey, + databaseName, + entry.getKey(), + entry.getValue())) + .toArray(String[]::new); + + int insertedRecords = + execute(connections, JdbcUtils.insertPropertiesStatement(properties.size()), args); + if (insertedRecords == properties.size()) { + return true; + } + throw new IllegalStateException( + String.format( + "Failed to insert: %d of %d succeeded", + insertedRecords, properties.size())); + } + + private static String insertPropertiesStatement(int size) { + StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_DATABASE_PROPERTIES_SQL); + for (int i = 0; i < size; i++) { + if (i != 0) { + sqlStatement.append(", "); + } + sqlStatement.append(JdbcUtils.INSERT_PROPERTIES_VALUES_BASE); + } + return sqlStatement.toString(); + } + + public static void createDistributedLockTable(JdbcClientPool connections) + throws SQLException, InterruptedException { + DistributedLockDialectFactory.create(connections.getProtocol()).createTable(connections); + } + + public static boolean acquire( + JdbcClientPool connections, String lockId, long timeoutMillSeconds) + throws SQLException, InterruptedException { + JdbcDistributedLockDialect distributedLockDialect = + DistributedLockDialectFactory.create(connections.getProtocol()); + // Check and clear expire lock. + int affectedRows = distributedLockDialect.tryReleaseTimedOutLock(connections, lockId); + if (affectedRows > 0) { + LOG.debug("Successfully cleared " + affectedRows + " lock records"); + } + return distributedLockDialect.lockAcquire(connections, lockId, timeoutMillSeconds); + } + + public static void release(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException { + DistributedLockDialectFactory.create(connections.getProtocol()) + .releaseLock(connections, lockId); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java new file mode 100644 index 000000000000..206aa8cd77ad --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +/** Distributed lock implementation based on mysql table. */ +public class MysqlDistributedLockDialect extends AbstractDistributedLockDialect { + + @Override + public String getCreateTableSql() { + return "CREATE TABLE " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + "(" + + JdbcUtils.LOCK_ID + + " VARCHAR(1000) NOT NULL," + + JdbcUtils.ACQUIRED_AT + + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL," + + JdbcUtils.EXPIRE_TIME + + " BIGINT DEFAULT 0 NOT NULL," + + "PRIMARY KEY (" + + JdbcUtils.LOCK_ID + + ")" + + ")"; + } + + @Override + public String getLockAcquireSql() { + return "INSERT INTO " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " (" + + JdbcUtils.LOCK_ID + + "," + + JdbcUtils.EXPIRE_TIME + + ") VALUES (?,?)"; + } + + @Override + public String getReleaseLockSql() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE " + + JdbcUtils.LOCK_ID + + " = ?"; + } + + @Override + public String getTryReleaseTimedOutLock() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE TIMESTAMPDIFF(SECOND, " + + JdbcUtils.ACQUIRED_AT + + ", NOW()) >" + + JdbcUtils.EXPIRE_TIME + + " and " + + JdbcUtils.LOCK_ID + + " = ?"; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java new file mode 100644 index 000000000000..602fdd1d625e --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +/** Distributed lock implementation based on sqlite table. */ +public class SqlLiteDistributedLockDialect extends AbstractDistributedLockDialect { + + @Override + public String getCreateTableSql() { + return "CREATE TABLE " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + "(" + + JdbcUtils.LOCK_ID + + " VARCHAR(1000) NOT NULL," + + JdbcUtils.ACQUIRED_AT + + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL," + + JdbcUtils.EXPIRE_TIME + + " BIGINT DEFAULT 0 NOT NULL," + + "PRIMARY KEY (" + + JdbcUtils.LOCK_ID + + ")" + + ")"; + } + + @Override + public String getLockAcquireSql() { + return "INSERT INTO " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " (" + + JdbcUtils.LOCK_ID + + "," + + JdbcUtils.EXPIRE_TIME + + ") VALUES (?,?)"; + } + + @Override + public String getReleaseLockSql() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE " + + JdbcUtils.LOCK_ID + + " = ?"; + } + + @Override + public String getTryReleaseTimedOutLock() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE strftime('%s', 'now') - strftime('%s', " + + JdbcUtils.ACQUIRED_AT + + ") > " + + JdbcUtils.EXPIRE_TIME + + " and " + + JdbcUtils.LOCK_ID + + " = ?"; + } +} diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 485ba553145d..cc2b3f063138 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.paimon.catalog.FileSystemCatalogFactory +org.apache.paimon.jdbc.JdbcCatalogFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java new file mode 100644 index 000000000000..a9a225cf173f --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.CatalogTestBase; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.CatalogOptions; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link JdbcCatalog}. */ +public class JdbcCatalogTest extends CatalogTestBase { + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + catalog = initCatalog("test-jdbc-catalog", Maps.newHashMap()); + } + + private JdbcCatalog initCatalog(String storeKey, Map props) { + Map properties = Maps.newHashMap(); + properties.put( + CatalogOptions.URI.key(), + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.putAll(props); + JdbcCatalog jdbcCatalog = new JdbcCatalog(fileIO, storeKey, properties, warehouse); + return jdbcCatalog; + } + + @Test + public void testAcquireLockFail() throws SQLException, InterruptedException { + String lockId = "jdbc.testDb.testTable"; + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000)) + .isTrue(); + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000)) + .isFalse(); + } + + @Test + public void testCleanTimeoutLockAndAcquireLock() throws SQLException, InterruptedException { + String lockId = "jdbc.testDb.testTable"; + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000)) + .isTrue(); + Thread.sleep(2000); + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000)) + .isTrue(); + } + + @Test + @Override + public void testListDatabasesWhenNoDatabases() { + List databases = catalog.listDatabases(); + assertThat(databases).isEqualTo(new ArrayList<>()); + } + + @Test + public void testCheckIdentifierUpperCase() throws Exception { + catalog.createDatabase("test_db", false); + assertThatThrownBy( + () -> + catalog.createTable( + Identifier.create("TEST_DB", "new_table"), + DEFAULT_TABLE_SCHEMA, + false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Database name [TEST_DB] cannot contain upper case in the catalog."); + + assertThatThrownBy( + () -> + catalog.createTable( + Identifier.create("test_db", "NEW_TABLE"), + DEFAULT_TABLE_SCHEMA, + false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog."); + } +} diff --git a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java index bef414915dc8..37d8661d2ead 100644 --- a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java +++ b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java @@ -76,6 +76,7 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation("paimon-common", "org.apache.paimon"), new OptionsClassLocation("paimon-core", "org.apache.paimon.lookup"), new OptionsClassLocation("paimon-core", "org.apache.paimon.catalog"), + new OptionsClassLocation("paimon-core", "org.apache.paimon.jdbc"), new OptionsClassLocation( "paimon-flink/paimon-flink-common", "org.apache.paimon.flink"), new OptionsClassLocation(