diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index bc170ae933c97..4d12d96ed0af1 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -176,151 +176,6 @@ Using the table option facilitates the convenient definition of Hive table param Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. - - -## Creating a Catalog with Filesystem Metastore - -{{< tabs "filesystem-metastore-example" >}} - -{{< tab "Flink" >}} - -The following Flink SQL registers and uses a Paimon catalog named `my_catalog`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. - -```sql -CREATE CATALOG my_catalog WITH ( - 'type' = 'paimon', - 'warehouse' = 'hdfs:///path/to/warehouse' -); - -USE CATALOG my_catalog; -``` - -You can define any default table options with the prefix `table-default.` for tables created in the catalog. - -{{< /tab >}} - -{{< tab "Spark3" >}} - -The following shell command registers a paimon catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. - -```bash -spark-sql ... \ - --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ - --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse -``` - -You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog. - -After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL. - -```sql -USE paimon.default; -``` - -{{< /tab >}} - -{{< /tabs >}} - -## Creating a Catalog with Hive Metastore - -By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables created in such catalog can also be accessed directly from Hive. - -To use Hive catalog, Database name, Table name and Field names should be **lower** case. - -{{< tabs "hive-metastore-example" >}} - -{{< tab "Flink" >}} - -Paimon Hive catalog in Flink relies on Flink Hive connector bundled jar. You should first download Hive connector bundled jar and add it to classpath. - -| Metastore version | Bundle Name | SQL Client JAR | -|:------------------|:--------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 2.3.0 - 3.1.3 | Flink Bundle | [Download](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar) | -| 1.2.0 - x.x.x | Presto Bundle | [Download](https://repo.maven.apache.org/maven2/com/facebook/presto/hive/hive-apache/1.2.2-2/hive-apache-1.2.2-2.jar) | - -The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore. - -If your Hive requires security authentication such as Kerberos, LDAP, Ranger or you want the paimon table to be managed -by Apache Atlas(Setting 'hive.metastore.event.listeners' in hive-site.xml). You can specify the hive-conf-dir and -hadoop-conf-dir parameter to the hive-site.xml file path. - -```sql -CREATE CATALOG my_hive WITH ( - 'type' = 'paimon', - 'metastore' = 'hive', - -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf - -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment - -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment - -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf -); - -USE CATALOG my_hive; -``` - -You can define any default table options with the prefix `table-default.` for tables created in the catalog. - -Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}). - -{{< /tab >}} - -{{< tab "Spark3" >}} - -Your Spark installation should be able to detect, or already contains Hive dependencies. See [here](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html) for more information. - -The following shell command registers a Paimon Hive catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore. - -```bash -spark-sql ... \ - --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ - --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \ - --conf spark.sql.catalog.paimon.metastore=hive \ - --conf spark.sql.catalog.paimon.uri=thrift://: -``` - -You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog. - -After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL. - -```sql -USE paimon.default; -``` - -Also, you can create [SparkGenericCatalog]({{< ref "engines/spark" >}}). - -{{< /tab >}} - -{{< /tabs >}} - -> When using hive catalog to change incompatible column types through alter table, you need to configure `hive.metastore.disallow.incompatible.col.type.changes=false`. see [HIVE-17832](https://issues.apache.org/jira/browse/HIVE-17832). - -> If you are using Hive3, please disable Hive ACID: -> -> ```shell -> hive.strict.managed.tables=false -> hive.create.as.insert.only=false -> metastore.create.as.acid=false -> ``` - -### Setting Location in Properties - -If you are using an object storage , and you don't want that the location of paimon table/database is accessed by the filesystem of hive, -which may lead to the error such as "No FileSystem for scheme: s3a". -You can set location in the properties of table/database by the config of `location-in-properties`. See -[setting the location of table/database in properties ]({{< ref "maintenance/configurations#HiveCatalogOptions" >}}) - -### Synchronizing Partitions into Hive Metastore - -By default, Paimon does not synchronize newly created partitions into Hive metastore. Users will see an unpartitioned table in Hive. Partition push-down will be carried out by filter push-down instead. - -If you want to see a partitioned table in Hive and also synchronize newly created partitions into Hive metastore, please set the table property `metastore.partitioned-table` to true. Also see [CoreOptions]({{< ref "maintenance/configurations#CoreOptions" >}}). - -### Adding Parameters to a Hive Table - -Using the table option facilitates the convenient definition of Hive table parameters. -Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. -For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. - - ## Creating a Catalog with JDBC Metastore By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as MySQL, postgres, etc. @@ -353,8 +208,6 @@ You can define any connection parameters for a database with the prefix "jdbc.". You can define any default table options with the prefix `table-default.` for tables created in the catalog. -Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}). - {{< /tab >}} {{< /tabs >}} diff --git a/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html index 622a635f003e5..fb21e6db26351 100644 --- a/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html @@ -26,6 +26,12 @@ + +
catalog-name
+ (none) + String + Custom jdbc catalog name. +
initialize-catalog-tables
false diff --git a/paimon-core/src/main/java/org/apache/paimon/ClientPool.java b/paimon-core/src/main/java/org/apache/paimon/ClientPool.java index 70ffdba794f12..82420ada43f75 100644 --- a/paimon-core/src/main/java/org/apache/paimon/ClientPool.java +++ b/paimon-core/src/main/java/org/apache/paimon/ClientPool.java @@ -18,7 +18,7 @@ package org.apache.paimon; -/** Define client connection pool. */ +/** Source: [core/src/main/java/org/apache/iceberg/ClientPool.java]. */ public interface ClientPool { /** Action interface for client. */ interface Action { diff --git a/paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java b/paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java index 0c0a663d3afd2..0558b3f48d848 100644 --- a/paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java @@ -27,7 +27,7 @@ import static org.apache.paimon.utils.Preconditions.checkState; -/** A universal implementation for defining client connection pools. */ +/** Source: [core/src/main/java/org/apache/iceberg/ClientPoolImpl.java]. */ public abstract class ClientPoolImpl implements Closeable, ClientPool { private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class); @@ -59,7 +59,6 @@ public R run(Action action, boolean retry) throws E, InterruptedExc C client = get(); try { return action.run(client); - } catch (Exception exc) { if (retry && isConnectionException(exc)) { try { @@ -104,13 +103,12 @@ public void close() { } } if (clients.isEmpty() && currentSize > 0) { - // wake every second in case this missed the signal synchronized (signal) { + // wake every second in case this missed the signal signal.wait(1000); } } } - } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e); diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java new file mode 100644 index 0000000000000..f3469d0b5ba5a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/AbstractDistributedLockDialect.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** Jdbc distributed lock interface. */ +public abstract class AbstractDistributedLockDialect implements JdbcDistributedLockDialect { + + @Override + public void createTable(JdbcClientPool connections) throws SQLException, InterruptedException { + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables( + null, null, JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(getCreateTableSql()).execute(); + }); + } + + public abstract String getCreateTableSql(); + + @Override + public boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds) + throws SQLException, InterruptedException { + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(getLockAcquireSql())) { + preparedStatement.setString(1, lockId); + preparedStatement.setLong(2, timeoutMillSeconds / 1000); + return preparedStatement.executeUpdate() > 0; + } catch (SQLException ex) { + return false; + } + }); + } + + public abstract String getLockAcquireSql(); + + @Override + public boolean releaseLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException { + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(getReleaseLockSql())) { + preparedStatement.setString(1, lockId); + return preparedStatement.executeUpdate() > 0; + } + }); + } + + public abstract String getReleaseLockSql(); + + @Override + public int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException { + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(getTryReleaseTimedOutLock())) { + preparedStatement.setString(1, lockId); + return preparedStatement.executeUpdate(); + } + }); + } + + public abstract String getTryReleaseTimedOutLock(); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java new file mode 100644 index 0000000000000..1978456386c04 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +class DistributedLockDialectFactory { + static JdbcDistributedLockDialect create(String protocol) { + JdbcProtocol type = JdbcProtocol.valueOf(protocol.toUpperCase()); + switch (type) { + case SQLITE: + return new SqlLiteDistributedLockDialect(); + case MYSQL: + return new MysqlDistributedLockDialect(); + default: + throw new UnsupportedOperationException( + String.format("Distributed locks based on %s are not supported", protocol)); + } + } + + /** Supported jdbc protocol. */ + enum JdbcProtocol { + SQLITE, + // for mysql. + MARIADB, + MYSQL; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 064b50554937f..4e09c0d44616a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -18,6 +18,7 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogLock; import org.apache.paimon.catalog.Identifier; @@ -56,7 +57,6 @@ import static org.apache.paimon.jdbc.JdbcUtils.execute; import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; import static org.apache.paimon.jdbc.JdbcUtils.updateTable; -import static org.apache.paimon.jdbc.JdbcUtils.updateTableMetadataLocation; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; /** Support jdbc catalog. */ @@ -96,6 +96,11 @@ protected JdbcCatalog( } } + @VisibleForTesting + public JdbcClientPool getConnections() { + return connections; + } + /** Initialize catalog tables. */ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException { boolean initializeCatalogTables = @@ -130,19 +135,10 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc .execute(); }); - // Check and create distributed lock table. - connections.run( - conn -> { - DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = - dbMeta.getTables( - null, null, JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME, null); - if (tableExists.next()) { - return true; - } - return conn.prepareStatement(JdbcUtils.CREATE_DISTRIBUTED_LOCK_TABLE_SQL) - .execute(); - }); + // if lock enabled, Check and create distributed lock table. + if (lockEnabled()) { + JdbcUtils.createDistributedLockTable(connections); + } } } @@ -270,7 +266,6 @@ protected void createTableImpl(Identifier identifier, Schema schema) { sql.setString(1, catalogName); sql.setString(2, identifier.getDatabaseName()); sql.setString(3, identifier.getObjectName()); - sql.setString(4, path.toString()); return sql.executeUpdate(); } }); @@ -312,9 +307,6 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { + " to underlying files.", e); } - // Update table metadata - updateTableMetadataLocation( - connections, catalogName, toTable, toPath.toString(), fromPath.toString()); } } catch (Exception e) { throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); @@ -328,30 +320,8 @@ protected void alterTableImpl(Identifier identifier, List changes) throw new RuntimeException( String.format("Table is not exists {}", identifier.getFullName())); } - final SchemaManager schemaManager = getSchemaManager(identifier); - // first commit changes to underlying files - TableSchema schema = schemaManager.commitChanges(changes); - try { - String newMetadataLocation = getDataTableLocation(identifier).toString(); - Map tableMetadata = - JdbcUtils.getTable( - connections, - catalogName, - identifier.getDatabaseName(), - identifier.getObjectName()); - String oldMetadataLocation = tableMetadata.get(JdbcUtils.METADATA_LOCATION_PROP); - if (!newMetadataLocation.equals(oldMetadataLocation)) { - updateTableMetadataLocation( - connections, - catalogName, - identifier, - newMetadataLocation, - oldMetadataLocation); - } - } catch (Exception te) { - schemaManager.deleteSchema(schema.id()); - throw new RuntimeException(te); - } + SchemaManager schemaManager = getSchemaManager(identifier); + schemaManager.commitChanges(changes); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java index 899f5dc475c95..5fdb84b697533 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -36,6 +36,7 @@ public String identifier() { @Override public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { - return new JdbcCatalog(fileIO, null, context.options().toMap(), warehouse.getName()); + String catalogName = context.options().get(JdbcCatalogOptions.CATALOG_NAME); + return new JdbcCatalog(fileIO, catalogName, context.options().toMap(), warehouse.getName()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java index 14109d08e452a..7ac9fc89b977b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java @@ -29,5 +29,11 @@ public final class JdbcCatalogOptions { .defaultValue(false) .withDescription("Enable automatic table creation."); + public static final ConfigOption CATALOG_NAME = + ConfigOptions.key("catalog-name") + .stringType() + .defaultValue(null) + .withDescription("Custom jdbc catalog name."); + private JdbcCatalogOptions() {} } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java index 3bcf2d6d03efa..1949b7d59226c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java @@ -19,7 +19,6 @@ package org.apache.paimon.jdbc; import org.apache.paimon.ClientPoolImpl; -import org.apache.paimon.options.CatalogOptions; import java.sql.Connection; import java.sql.DriverManager; @@ -27,27 +26,27 @@ import java.sql.SQLNonTransientConnectionException; import java.util.Map; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** Client pool for jdbc. */ public class JdbcClientPool extends ClientPoolImpl { + private static final Pattern PROTOCOL_PATTERN = Pattern.compile("jdbc:([^:]+):(.*)"); private final String dbUrl; private final Map properties; - - public JdbcClientPool(String dbUrl, Map props) { - this( - Integer.parseInt( - props.getOrDefault( - CatalogOptions.CLIENT_POOL_SIZE.key(), - String.valueOf(CatalogOptions.CLIENT_POOL_SIZE.defaultValue()))), - dbUrl, - props); - } + private final String protocol; public JdbcClientPool(int poolSize, String dbUrl, Map props) { super(poolSize, SQLNonTransientConnectionException.class, true); properties = props; this.dbUrl = dbUrl; + Matcher matcher = PROTOCOL_PATTERN.matcher(dbUrl); + if (matcher.matches()) { + this.protocol = matcher.group(1); + } else { + throw new RuntimeException("Valid Jdbc url failure: " + dbUrl); + } } @Override @@ -67,6 +66,10 @@ protected Connection reconnect(Connection client) { return newClient(); } + public String getProtocol() { + return protocol; + } + @Override protected void close(Connection client) { try { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java new file mode 100644 index 0000000000000..a691aac2295fd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcDistributedLockDialect.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import java.sql.SQLException; + +/** Jdbc distributed lock interface. */ +public interface JdbcDistributedLockDialect { + void createTable(JdbcClientPool connections) throws SQLException, InterruptedException; + + boolean lockAcquire(JdbcClientPool connections, String lockId, long timeoutMillSeconds) + throws SQLException, InterruptedException; + + boolean releaseLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException; + + int tryReleaseTimedOutLock(JdbcClientPool connections, String lockId) + throws SQLException, InterruptedException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 231f4d44a2983..ce408e415544e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -29,41 +29,19 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.Timestamp; -import java.util.Collections; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.function.Consumer; import java.util.stream.Stream; /** Util for jdbc catalog. */ public class JdbcUtils { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); - public static final String METADATA_LOCATION_PROP = "metadata_location"; - public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; public static final String CATALOG_TABLE_NAME = "paimon_tables"; public static final String CATALOG_NAME = "catalog_name"; public static final String TABLE_DATABASE = "database_name"; public static final String TABLE_NAME = "table_name"; - static final String DO_COMMIT_SQL = - "UPDATE " - + CATALOG_TABLE_NAME - + " SET " - + METADATA_LOCATION_PROP - + " = ? , " - + PREVIOUS_METADATA_LOCATION_PROP - + " = ? " - + " WHERE " - + CATALOG_NAME - + " = ? AND " - + TABLE_DATABASE - + " = ? AND " - + TABLE_NAME - + " = ? AND " - + METADATA_LOCATION_PROP - + " = ?"; static final String CREATE_CATALOG_TABLE = "CREATE TABLE " + CATALOG_TABLE_NAME @@ -74,10 +52,6 @@ public class JdbcUtils { + " VARCHAR(255) NOT NULL," + TABLE_NAME + " VARCHAR(255) NOT NULL," - + METADATA_LOCATION_PROP - + " VARCHAR(1000)," - + PREVIOUS_METADATA_LOCATION_PROP - + " VARCHAR(1000)," + " PRIMARY KEY (" + CATALOG_NAME + ", " @@ -166,12 +140,8 @@ public class JdbcUtils { + TABLE_DATABASE + ", " + TABLE_NAME - + ", " - + METADATA_LOCATION_PROP - + ", " - + PREVIOUS_METADATA_LOCATION_PROP + ") " - + " VALUES (?,?,?,?,null)"; + + " VALUES (?,?,?)"; // Catalog database Properties static final String DATABASE_PROPERTIES_TABLE_NAME = "paimon_database_properties"; @@ -231,16 +201,6 @@ public class JdbcUtils { + " = ? AND " + DATABASE_NAME + " = ? "; - static final String DELETE_DATABASE_PROPERTIES_SQL = - "DELETE FROM " - + DATABASE_PROPERTIES_TABLE_NAME - + " WHERE " - + CATALOG_NAME - + " = ? AND " - + DATABASE_NAME - + " = ? AND " - + DATABASE_PROPERTY_KEY - + " IN "; static final String DELETE_ALL_DATABASE_PROPERTIES_SQL = "DELETE FROM " + DATABASE_PROPERTIES_TABLE_NAME @@ -249,16 +209,6 @@ public class JdbcUtils { + " = ? AND " + DATABASE_NAME + " = ?"; - static final String LIST_PROPERTY_DATABASES_SQL = - "SELECT DISTINCT " - + DATABASE_NAME - + " FROM " - + DATABASE_PROPERTIES_TABLE_NAME - + " WHERE " - + CATALOG_NAME - + " = ? AND " - + DATABASE_NAME - + " LIKE ?"; static final String LIST_ALL_PROPERTY_DATABASES_SQL = "SELECT DISTINCT " + DATABASE_NAME @@ -270,42 +220,9 @@ public class JdbcUtils { // Distributed locks table static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks"; - static final String LOCK_NAME = "lock_name"; + static final String LOCK_ID = "lock_id"; static final String ACQUIRED_AT = "acquired_at"; - - static final String CREATE_DISTRIBUTED_LOCK_TABLE_SQL = - "CREATE TABLE " - + DISTRIBUTED_LOCKS_TABLE_NAME - + "(" - + LOCK_NAME - + " VARCHAR(1000) NOT NULL," - + ACQUIRED_AT - + " TIMESTAMP NULL DEFAULT NULL," - + "PRIMARY KEY (" - + LOCK_NAME - + ")" - + ")"; - - static final String DISTRIBUTED_LOCK_ACQUIRE_SQL = - "INSERT INTO " - + DISTRIBUTED_LOCKS_TABLE_NAME - + " (" - + LOCK_NAME - + ", " - + ACQUIRED_AT - + ") VALUES (?, ?)"; - - static final String DISTRIBUTED_LOCK_RELEASE_SQL = - "DELETE FROM " + DISTRIBUTED_LOCKS_TABLE_NAME + " WHERE " + LOCK_NAME + " = ?"; - - static final String DISTRIBUTED_LOCK_EXPIRE_CLEAR_SQL = - "DELETE FROM " - + DISTRIBUTED_LOCKS_TABLE_NAME - + " WHERE " - + LOCK_NAME - + " = ? AND " - + ACQUIRED_AT - + " < ?"; + static final String EXPIRE_TIME = "expire_time_seconds"; public static Properties extractJdbcConfiguration( Map properties, String prefix) { @@ -336,10 +253,6 @@ public static Map getTable( table.put(CATALOG_NAME, rs.getString(CATALOG_NAME)); table.put(TABLE_DATABASE, rs.getString(TABLE_DATABASE)); table.put(TABLE_NAME, rs.getString(TABLE_NAME)); - table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP)); - table.put( - PREVIOUS_METADATA_LOCATION_PROP, - rs.getString(PREVIOUS_METADATA_LOCATION_PROP)); } rs.close(); } @@ -355,8 +268,6 @@ public static void updateTable( int updatedRecords = execute( err -> { - // SQLite doesn't set SQLState or throw - // SQLIntegrityConstraintViolationException if (err instanceof SQLIntegrityConstraintViolationException || (err.getMessage() != null && err.getMessage().contains("constraint failed"))) { @@ -383,40 +294,6 @@ public static void updateTable( } } - /** Update table metadata location. */ - public static void updateTableMetadataLocation( - JdbcClientPool connections, - String catalogName, - Identifier identifier, - String newMetadataLocation, - String oldMetadataLocation) - throws SQLException, InterruptedException { - int updatedRecords = - connections.run( - conn -> { - try (PreparedStatement sql = - conn.prepareStatement(JdbcUtils.DO_COMMIT_SQL)) { - // UPDATE - sql.setString(1, newMetadataLocation); - sql.setString(2, oldMetadataLocation); - // WHERE - sql.setString(3, catalogName); - sql.setString(4, identifier.getDatabaseName()); - sql.setString(5, identifier.getObjectName()); - sql.setString(6, oldMetadataLocation); - return sql.executeUpdate(); - } - }); - if (updatedRecords == 1) { - LOG.debug("Successfully committed to existing table: {}", identifier.getFullName()); - } else { - throw new RuntimeException( - String.format( - "Failed to update table %s from catalog %s", - identifier.getFullName(), catalogName)); - } - } - public static boolean databaseExists( JdbcClientPool connections, String catalogName, String databaseName) { @@ -517,39 +394,6 @@ public static boolean insertProperties( insertedRecords, properties.size())); } - public static boolean updateProperties( - JdbcClientPool connections, - String catalogName, - String databaseName, - Map properties) { - Stream caseArgs = - properties.entrySet().stream() - .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())); - Stream whereArgs = - Stream.concat(Stream.of(catalogName, databaseName), properties.keySet().stream()); - String[] args = Stream.concat(caseArgs, whereArgs).toArray(String[]::new); - int updatedRecords = - execute(connections, JdbcUtils.updatePropertiesStatement(properties.size()), args); - if (updatedRecords == properties.size()) { - return true; - } - throw new IllegalStateException( - String.format( - "Failed to update: %d of %d succeeded", updatedRecords, properties.size())); - } - - public static boolean deleteProperties( - JdbcClientPool connections, - String catalogName, - String databaseName, - Set properties) { - String[] args = - Stream.concat(Stream.of(catalogName, databaseName), properties.stream()) - .toArray(String[]::new); - - return execute(connections, JdbcUtils.deletePropertiesStatement(properties), args) > 0; - } - private static String insertPropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_DATABASE_PROPERTIES_SQL); for (int i = 0; i < size; i++) { @@ -561,86 +405,27 @@ private static String insertPropertiesStatement(int size) { return sqlStatement.toString(); } - private static String deletePropertiesStatement(Set properties) { - StringBuilder sqlStatement = new StringBuilder(JdbcUtils.DELETE_DATABASE_PROPERTIES_SQL); - String values = - String.join(",", Collections.nCopies(properties.size(), String.valueOf('?'))); - sqlStatement.append("(").append(values).append(")"); - - return sqlStatement.toString(); - } - - private static String updatePropertiesStatement(int size) { - StringBuilder sqlStatement = - new StringBuilder( - "UPDATE " - + DATABASE_PROPERTIES_TABLE_NAME - + " SET " - + DATABASE_PROPERTY_VALUE - + " = CASE"); - for (int i = 0; i < size; i += 1) { - sqlStatement.append(" WHEN " + DATABASE_PROPERTY_KEY + " = ? THEN ?"); - } - sqlStatement.append( - " END WHERE " - + CATALOG_NAME - + " = ? AND " - + DATABASE_NAME - + " = ? AND " - + DATABASE_PROPERTY_KEY - + " IN "); - - String values = String.join(",", Collections.nCopies(size, String.valueOf('?'))); - sqlStatement.append("(").append(values).append(")"); - return sqlStatement.toString(); + public static void createDistributedLockTable(JdbcClientPool connections) + throws SQLException, InterruptedException { + DistributedLockDialectFactory.create(connections.getProtocol()).createTable(connections); } - public static boolean acquire(JdbcClientPool connections, String lockName, long timeout) + public static boolean acquire( + JdbcClientPool connections, String lockId, long timeoutMillSeconds) throws SQLException, InterruptedException { - // Check and clear expire lock - int affectedRows = tryClearExpireLock(connections, lockName, timeout); + JdbcDistributedLockDialect jdbcDistributedLockTable = + DistributedLockDialectFactory.create(connections.getProtocol()); + // Check and clear expire lock. + int affectedRows = jdbcDistributedLockTable.tryReleaseTimedOutLock(connections, lockId); if (affectedRows > 0) { LOG.debug("Successfully cleared " + affectedRows + " lock records"); } - return connections.run( - connection -> { - try (PreparedStatement preparedStatement = - connection.prepareStatement(DISTRIBUTED_LOCK_ACQUIRE_SQL)) { - preparedStatement.setString(1, lockName); - preparedStatement.setTimestamp( - 2, new Timestamp(System.currentTimeMillis())); - return preparedStatement.executeUpdate() > 0; - } catch (SQLException e) { - LOG.error("Try acquire lock failed.", e); - return false; - } - }); - } - - public static void release(JdbcClientPool connections, String lockName) - throws SQLException, InterruptedException { - connections.run( - connection -> { - try (PreparedStatement preparedStatement = - connection.prepareStatement(DISTRIBUTED_LOCK_RELEASE_SQL)) { - preparedStatement.setString(1, lockName); - return preparedStatement.executeUpdate() > 0; - } - }); + return jdbcDistributedLockTable.lockAcquire(connections, lockId, timeoutMillSeconds); } - private static int tryClearExpireLock(JdbcClientPool connections, String lockName, long timeout) + public static void release(JdbcClientPool connections, String lockId) throws SQLException, InterruptedException { - long expirationTimeMillis = System.currentTimeMillis() - timeout * 1000; - Timestamp expirationTimestamp = new Timestamp(expirationTimeMillis); - return connections.run( - connection -> { - try (PreparedStatement preparedStatement = - connection.prepareStatement(DISTRIBUTED_LOCK_EXPIRE_CLEAR_SQL)) { - preparedStatement.setString(1, lockName); - preparedStatement.setTimestamp(2, expirationTimestamp); - return preparedStatement.executeUpdate(); - } - }); + DistributedLockDialectFactory.create(connections.getProtocol()) + .releaseLock(connections, lockId); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java new file mode 100644 index 0000000000000..903108762cc79 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/MysqlDistributedLockDialect.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +/** Distributed lock implementation based on sqlite table. */ +public class MysqlDistributedLockDialect extends AbstractDistributedLockDialect { + + @Override + public String getCreateTableSql() { + return "CREATE TABLE " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + "(" + + JdbcUtils.LOCK_ID + + " VARCHAR(1000) NOT NULL," + + JdbcUtils.ACQUIRED_AT + + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL," + + JdbcUtils.EXPIRE_TIME + + " BIGINT DEFAULT 0 NOT NULL," + + "PRIMARY KEY (" + + JdbcUtils.LOCK_ID + + ")" + + ")"; + } + + @Override + public String getLockAcquireSql() { + return "INSERT INTO " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " (" + + JdbcUtils.LOCK_ID + + "," + + JdbcUtils.EXPIRE_TIME + + ") VALUES (?,?)"; + } + + @Override + public String getReleaseLockSql() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE " + + JdbcUtils.LOCK_ID + + " = ?"; + } + + @Override + public String getTryReleaseTimedOutLock() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE TIMESTAMPDIFF(SECOND, " + + JdbcUtils.ACQUIRED_AT + + ", NOW()) >" + + JdbcUtils.EXPIRE_TIME + + " and " + + JdbcUtils.LOCK_ID + + " = ?"; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java new file mode 100644 index 0000000000000..602fdd1d625e2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/SqlLiteDistributedLockDialect.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +/** Distributed lock implementation based on sqlite table. */ +public class SqlLiteDistributedLockDialect extends AbstractDistributedLockDialect { + + @Override + public String getCreateTableSql() { + return "CREATE TABLE " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + "(" + + JdbcUtils.LOCK_ID + + " VARCHAR(1000) NOT NULL," + + JdbcUtils.ACQUIRED_AT + + " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL," + + JdbcUtils.EXPIRE_TIME + + " BIGINT DEFAULT 0 NOT NULL," + + "PRIMARY KEY (" + + JdbcUtils.LOCK_ID + + ")" + + ")"; + } + + @Override + public String getLockAcquireSql() { + return "INSERT INTO " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " (" + + JdbcUtils.LOCK_ID + + "," + + JdbcUtils.EXPIRE_TIME + + ") VALUES (?,?)"; + } + + @Override + public String getReleaseLockSql() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE " + + JdbcUtils.LOCK_ID + + " = ?"; + } + + @Override + public String getTryReleaseTimedOutLock() { + return "DELETE FROM " + + JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE strftime('%s', 'now') - strftime('%s', " + + JdbcUtils.ACQUIRED_AT + + ") > " + + JdbcUtils.EXPIRE_TIME + + " and " + + JdbcUtils.LOCK_ID + + " = ?"; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 5afa4d2601ea9..13ed1f979165f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -53,12 +54,33 @@ private JdbcCatalog initCatalog(String catalogName, Map props) { properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); + properties.put(CatalogOptions.LOCK_ENABLED.key(), "true"); + properties.put(JdbcCatalogOptions.INITIALIZE_CATALOG_TABLES.key(), "true"); properties.putAll(props); JdbcCatalog jdbcCatalog = new JdbcCatalog(fileIO, catalogName, properties, warehouse); return jdbcCatalog; } + @Test + public void testAcquireLockFail() throws SQLException, InterruptedException { + String lockId = "jdbc.testDb.testTable"; + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000)) + .isTrue(); + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000)) + .isFalse(); + } + + @Test + public void testCleanTimeoutLockAndAcquireLock() throws SQLException, InterruptedException { + String lockId = "jdbc.testDb.testTable"; + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000)) + .isTrue(); + Thread.sleep(2000); + assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000)) + .isTrue(); + } + @Test @Override public void testListDatabasesWhenNoDatabases() {