From 4fd21acf2fa0abd8ca9b46fe6e10382d5b9598cb Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Fri, 2 Feb 2024 23:37:44 +0800 Subject: [PATCH] Support jdbc catalog --- docs/content/how-to/creating-catalogs.md | 183 +++++ docs/content/maintenance/configurations.md | 6 + .../generated/catalog_configuration.html | 8 +- .../generated/jdbc_catalog_configuration.html | 36 + .../apache/paimon/options/CatalogOptions.java | 9 +- paimon-core/pom.xml | 7 + .../java/org/apache/paimon/ClientPool.java | 31 + .../org/apache/paimon/ClientPoolImpl.java | 157 +++++ .../org/apache/paimon/jdbc/JdbcCatalog.java | 468 +++++++++++++ .../paimon/jdbc/JdbcCatalogFactory.java | 41 ++ .../apache/paimon/jdbc/JdbcCatalogLock.java | 132 ++++ .../paimon/jdbc/JdbcCatalogOptions.java | 33 + .../apache/paimon/jdbc/JdbcClientPool.java | 78 +++ .../org/apache/paimon/jdbc/JdbcUtils.java | 646 ++++++++++++++++++ .../org.apache.paimon.factories.Factory | 1 + .../apache/paimon/jdbc/JdbcCatalogTest.java | 90 +++ .../ConfigOptionsDocGenerator.java | 1 + 17 files changed, 1925 insertions(+), 2 deletions(-) create mode 100644 docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html create mode 100644 paimon-core/src/main/java/org/apache/paimon/ClientPool.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java diff --git a/docs/content/how-to/creating-catalogs.md b/docs/content/how-to/creating-catalogs.md index a3de5df31e566..bc170ae933c97 100644 --- a/docs/content/how-to/creating-catalogs.md +++ b/docs/content/how-to/creating-catalogs.md @@ -30,6 +30,7 @@ Paimon catalogs currently support two types of metastores: * `filesystem` metastore (default), which stores both metadata and table files in filesystems. * `hive` metastore, which additionally stores metadata in Hive metastore. Users can directly access the tables from Hive. +* `jdbc` metastore, which additionally stores metadata in relational databases such as MySQL, Postgres, etc. See [CatalogOptions]({{< ref "maintenance/configurations#catalogoptions" >}}) for detailed options when creating a catalog. @@ -175,3 +176,185 @@ Using the table option facilitates the convenient definition of Hive table param Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. + + +## Creating a Catalog with Filesystem Metastore + +{{< tabs "filesystem-metastore-example" >}} + +{{< tab "Flink" >}} + +The following Flink SQL registers and uses a Paimon catalog named `my_catalog`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. + +```sql +CREATE CATALOG my_catalog WITH ( + 'type' = 'paimon', + 'warehouse' = 'hdfs:///path/to/warehouse' +); + +USE CATALOG my_catalog; +``` + +You can define any default table options with the prefix `table-default.` for tables created in the catalog. + +{{< /tab >}} + +{{< tab "Spark3" >}} + +The following shell command registers a paimon catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. + +```bash +spark-sql ... \ + --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse +``` + +You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog. + +After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL. + +```sql +USE paimon.default; +``` + +{{< /tab >}} + +{{< /tabs >}} + +## Creating a Catalog with Hive Metastore + +By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables created in such catalog can also be accessed directly from Hive. + +To use Hive catalog, Database name, Table name and Field names should be **lower** case. + +{{< tabs "hive-metastore-example" >}} + +{{< tab "Flink" >}} + +Paimon Hive catalog in Flink relies on Flink Hive connector bundled jar. You should first download Hive connector bundled jar and add it to classpath. + +| Metastore version | Bundle Name | SQL Client JAR | +|:------------------|:--------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.3.0 - 3.1.3 | Flink Bundle | [Download](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar) | +| 1.2.0 - x.x.x | Presto Bundle | [Download](https://repo.maven.apache.org/maven2/com/facebook/presto/hive/hive-apache/1.2.2-2/hive-apache-1.2.2-2.jar) | + +The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore. + +If your Hive requires security authentication such as Kerberos, LDAP, Ranger or you want the paimon table to be managed +by Apache Atlas(Setting 'hive.metastore.event.listeners' in hive-site.xml). You can specify the hive-conf-dir and +hadoop-conf-dir parameter to the hive-site.xml file path. + +```sql +CREATE CATALOG my_hive WITH ( + 'type' = 'paimon', + 'metastore' = 'hive', + -- 'uri' = 'thrift://:', default use 'hive.metastore.uris' in HiveConf + -- 'hive-conf-dir' = '...', this is recommended in the kerberos environment + -- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment + -- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf +); + +USE CATALOG my_hive; +``` + +You can define any default table options with the prefix `table-default.` for tables created in the catalog. + +Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}). + +{{< /tab >}} + +{{< tab "Spark3" >}} + +Your Spark installation should be able to detect, or already contains Hive dependencies. See [here](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html) for more information. + +The following shell command registers a Paimon Hive catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore. + +```bash +spark-sql ... \ + --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ + --conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \ + --conf spark.sql.catalog.paimon.metastore=hive \ + --conf spark.sql.catalog.paimon.uri=thrift://: +``` + +You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog. + +After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL. + +```sql +USE paimon.default; +``` + +Also, you can create [SparkGenericCatalog]({{< ref "engines/spark" >}}). + +{{< /tab >}} + +{{< /tabs >}} + +> When using hive catalog to change incompatible column types through alter table, you need to configure `hive.metastore.disallow.incompatible.col.type.changes=false`. see [HIVE-17832](https://issues.apache.org/jira/browse/HIVE-17832). + +> If you are using Hive3, please disable Hive ACID: +> +> ```shell +> hive.strict.managed.tables=false +> hive.create.as.insert.only=false +> metastore.create.as.acid=false +> ``` + +### Setting Location in Properties + +If you are using an object storage , and you don't want that the location of paimon table/database is accessed by the filesystem of hive, +which may lead to the error such as "No FileSystem for scheme: s3a". +You can set location in the properties of table/database by the config of `location-in-properties`. See +[setting the location of table/database in properties ]({{< ref "maintenance/configurations#HiveCatalogOptions" >}}) + +### Synchronizing Partitions into Hive Metastore + +By default, Paimon does not synchronize newly created partitions into Hive metastore. Users will see an unpartitioned table in Hive. Partition push-down will be carried out by filter push-down instead. + +If you want to see a partitioned table in Hive and also synchronize newly created partitions into Hive metastore, please set the table property `metastore.partitioned-table` to true. Also see [CoreOptions]({{< ref "maintenance/configurations#CoreOptions" >}}). + +### Adding Parameters to a Hive Table + +Using the table option facilitates the convenient definition of Hive table parameters. +Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table. +For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process. + + +## Creating a Catalog with JDBC Metastore + +By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as MySQL, postgres, etc. + +{{< tabs "jdbc-metastore-example" >}} + +{{< tab "Flink" >}} + +Paimon JDBC Catalog in Flink needs to correctly add the corresponding jar package for connecting to the database. You should first download JDBC connector bundled jar and add it to classpath. such as MySQL, postgres + +| database type | Bundle Name | SQL Client JAR | +|:--------------|:---------------------|:---------------------------------------------------------------------------| +| mysql | mysql-connector-java | [Download](https://mvnrepository.com/artifact/mysql/mysql-connector-java) | +| postgres | postgresql | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) | + +```sql +CREATE CATALOG my_jdbc WITH ( + 'type' = 'paimon', + 'metastore' = 'jdbc', + -- 'uri' = 'jdbc:mysql://:/', + -- 'jdbc.user' = '...', + -- 'jdbc.password' = '...', + -- 'initialize-catalog-tables'='true' + -- 'warehouse' = 'hdfs:///path/to/warehouse', +); + +USE CATALOG my_jdbc; +``` +You can define any connection parameters for a database with the prefix "jdbc.". + +You can define any default table options with the prefix `table-default.` for tables created in the catalog. + +Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}). + +{{< /tab >}} + +{{< /tabs >}} diff --git a/docs/content/maintenance/configurations.md b/docs/content/maintenance/configurations.md index f48956462b6a5..87d59cd34eff2 100644 --- a/docs/content/maintenance/configurations.md +++ b/docs/content/maintenance/configurations.md @@ -50,6 +50,12 @@ Options for Hive catalog. {{< generated/hive_catalog_configuration >}} +### JdbcCatalogOptions + +Options for Jdbc catalog. + +{{< generated/jdbc_catalog_configuration >}} + ### FlinkCatalogOptions Flink catalog options for paimon. diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index d243737a8353a..e685559447e2d 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -26,6 +26,12 @@ + +
client-pool-size
+ 2 + Integer + Configure the size of the connection pool. +
fs.allow-hadoop-fallback
true @@ -60,7 +66,7 @@
metastore
"filesystem" String - Metastore of paimon catalog, supports filesystem and hive. + Metastore of paimon catalog, supports filesystem、hive and jdbc.
table.type
diff --git a/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html new file mode 100644 index 0000000000000..622a635f003e5 --- /dev/null +++ b/docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html @@ -0,0 +1,36 @@ +{{/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/}} + + + + + + + + + + + + + + + + + +
KeyDefaultTypeDescription
initialize-catalog-tables
falseBooleanEnable automatic table creation.
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index 0fba499dd13ba..42cd9e4188440 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -39,7 +39,8 @@ public class CatalogOptions { ConfigOptions.key("metastore") .stringType() .defaultValue("filesystem") - .withDescription("Metastore of paimon catalog, supports filesystem and hive."); + .withDescription( + "Metastore of paimon catalog, supports filesystem、hive and jdbc."); public static final ConfigOption URI = ConfigOptions.key("uri") @@ -78,6 +79,12 @@ public class CatalogOptions { .withDescription( "Allow to fallback to hadoop File IO when no file io found for the scheme."); + public static final ConfigOption CLIENT_POOL_SIZE = + key("client-pool-size") + .intType() + .defaultValue(2) + .withDescription("Configure the size of the connection pool."); + public static final ConfigOption LINEAGE_META = key("lineage-meta") .stringType() diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index 0a0effb15fea8..f4e028bdcb190 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -183,6 +183,13 @@ under the License. 3.6.1 + + org.xerial + sqlite-jdbc + 3.44.0.0 + test + + diff --git a/paimon-core/src/main/java/org/apache/paimon/ClientPool.java b/paimon-core/src/main/java/org/apache/paimon/ClientPool.java new file mode 100644 index 0000000000000..70ffdba794f12 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/ClientPool.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon; + +/** Define client connection pool. */ +public interface ClientPool { + /** Action interface for client. */ + interface Action { + R run(C client) throws E; + } + + R run(Action action) throws E, InterruptedException; + + R run(Action action, boolean retry) throws E, InterruptedException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java b/paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java new file mode 100644 index 0000000000000..0c0a663d3afd2 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/ClientPoolImpl.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.ArrayDeque; +import java.util.Deque; + +import static org.apache.paimon.utils.Preconditions.checkState; + +/** A universal implementation for defining client connection pools. */ +public abstract class ClientPoolImpl + implements Closeable, ClientPool { + private static final Logger LOG = LoggerFactory.getLogger(ClientPoolImpl.class); + + private final int poolSize; + private final Deque clients; + private final Class reconnectExc; + private final Object signal = new Object(); + private final boolean retryByDefault; + private volatile int currentSize; + private boolean closed; + + public ClientPoolImpl(int poolSize, Class reconnectExc, boolean retryByDefault) { + this.poolSize = poolSize; + this.reconnectExc = reconnectExc; + this.clients = new ArrayDeque<>(poolSize); + this.currentSize = 0; + this.closed = false; + this.retryByDefault = retryByDefault; + } + + @Override + public R run(Action action) throws E, InterruptedException { + return run(action, retryByDefault); + } + + @Override + public R run(Action action, boolean retry) throws E, InterruptedException { + C client = get(); + try { + return action.run(client); + + } catch (Exception exc) { + if (retry && isConnectionException(exc)) { + try { + client = reconnect(client); + } catch (Exception ignored) { + // if reconnection throws any exception, rethrow the original failure + throw reconnectExc.cast(exc); + } + + return action.run(client); + } + + throw exc; + + } finally { + release(client); + } + } + + protected abstract C newClient(); + + protected abstract C reconnect(C client); + + protected boolean isConnectionException(Exception exc) { + return reconnectExc.isInstance(exc); + } + + protected abstract void close(C client); + + @Override + public void close() { + this.closed = true; + try { + while (currentSize > 0) { + if (!clients.isEmpty()) { + synchronized (this) { + if (!clients.isEmpty()) { + C client = clients.removeFirst(); + close(client); + currentSize -= 1; + } + } + } + if (clients.isEmpty() && currentSize > 0) { + // wake every second in case this missed the signal + synchronized (signal) { + signal.wait(1000); + } + } + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while shutting down pool. Some clients may not be closed.", e); + } + } + + private C get() throws InterruptedException { + checkState(!closed, "Cannot get a client from a closed pool"); + while (true) { + if (!clients.isEmpty() || currentSize < poolSize) { + synchronized (this) { + if (!clients.isEmpty()) { + return clients.removeFirst(); + } else if (currentSize < poolSize) { + C client = newClient(); + currentSize += 1; + return client; + } + } + } + synchronized (signal) { + // wake every second in case this missed the signal + signal.wait(1000); + } + } + } + + private void release(C client) { + synchronized (this) { + clients.addFirst(client); + } + synchronized (signal) { + signal.notify(); + } + } + + public int poolSize() { + return poolSize; + } + + public boolean isClosed() { + return closed; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java new file mode 100644 index 0000000000000..baa92f5fe096a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.Lock; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.AbstractMap; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; +import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; +import static org.apache.paimon.jdbc.JdbcCatalogOptions.INITIALIZE_CATALOG_TABLES; +import static org.apache.paimon.jdbc.JdbcUtils.execute; +import static org.apache.paimon.jdbc.JdbcUtils.insertProperties; +import static org.apache.paimon.jdbc.JdbcUtils.updateTable; +import static org.apache.paimon.jdbc.JdbcUtils.updateTableMetadataLocation; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; + +/** Support jdbc catalog. */ +public class JdbcCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class); + public static final String PROPERTY_PREFIX = "jdbc."; + private static final String DATABASE_EXISTS_PROPERTY = "exists"; + private JdbcClientPool connections; + private String catalogName = "jdbc"; + private Map configuration; + private final String warehouse; + + protected JdbcCatalog( + FileIO fileIO, String catalogName, Map config, String warehouse) { + super(fileIO); + if (!StringUtils.isBlank(catalogName)) { + this.catalogName = catalogName; + } + this.configuration = config; + this.warehouse = warehouse; + Preconditions.checkNotNull(configuration, "Invalid catalog properties: null"); + this.connections = + new JdbcClientPool( + Integer.parseInt( + config.getOrDefault( + CatalogOptions.CLIENT_POOL_SIZE.key(), + CatalogOptions.CLIENT_POOL_SIZE.defaultValue().toString())), + configuration.get(CatalogOptions.URI.key()), + configuration); + try { + initializeCatalogTablesIfNeed(); + } catch (SQLException e) { + throw new RuntimeException("Cannot initialize JDBC catalog", e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted in call to initialize", e); + } + } + + /** Initialize catalog tables. */ + private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedException { + boolean initializeCatalogTables = + Boolean.parseBoolean( + configuration.getOrDefault(INITIALIZE_CATALOG_TABLES.key(), "false")); + String uri = configuration.get(CatalogOptions.URI.key()); + Preconditions.checkNotNull(uri, "JDBC connection URI is required"); + if (initializeCatalogTables) { + // Check and create catalog table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute(); + }); + + // Check and create database properties table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables( + null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE) + .execute(); + }); + + // Check and create distributed lock table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + ResultSet tableExists = + dbMeta.getTables( + null, null, JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME, null); + if (tableExists.next()) { + return true; + } + return conn.prepareStatement(JdbcUtils.CREATE_DISTRIBUTED_LOCK_TABLE_SQL) + .execute(); + }); + } + } + + @Override + public String warehouse() { + return warehouse; + } + + @Override + public List listDatabases() { + List databases = Lists.newArrayList(); + databases.addAll( + fetch( + row -> row.getString(JdbcUtils.TABLE_DATABASE), + JdbcUtils.LIST_ALL_TABLE_DATABASES_SQL, + catalogName)); + + databases.addAll( + fetch( + row -> row.getString(JdbcUtils.DATABASE_NAME), + JdbcUtils.LIST_ALL_PROPERTY_DATABASES_SQL, + catalogName)); + return databases; + } + + @Override + protected boolean databaseExistsImpl(String databaseName) { + return JdbcUtils.databaseExists(connections, catalogName, databaseName); + } + + @Override + protected Map loadDatabasePropertiesImpl(String databaseName) { + if (!databaseExists(databaseName)) { + throw new RuntimeException(String.format("Database does not exist: %s", databaseName)); + } + Map properties = Maps.newHashMap(); + properties.putAll(fetchProperties(databaseName)); + if (!properties.containsKey(DB_LOCATION_PROP)) { + properties.put(DB_LOCATION_PROP, newDatabasePath(databaseName).getName()); + } + properties.remove(DATABASE_EXISTS_PROPERTY); + return ImmutableMap.copyOf(properties); + } + + @Override + protected void createDatabaseImpl(String name, Map properties) { + if (databaseExists(name)) { + throw new RuntimeException(String.format("Database already exists: %s", name)); + } + + Map createProps = new HashMap<>(); + createProps.put(DATABASE_EXISTS_PROPERTY, "true"); + if (properties != null && !properties.isEmpty()) { + createProps.putAll(properties); + } + + if (!createProps.containsKey(DB_LOCATION_PROP)) { + Path databasePath = newDatabasePath(name); + createProps.put(DB_LOCATION_PROP, databasePath.toString()); + } + insertProperties(connections, catalogName, name, createProps); + } + + @Override + protected void dropDatabaseImpl(String name) { + // Delete table from paimon_tables + execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogName, name); + // Delete properties from paimon_database_properties + execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, catalogName, name); + } + + @Override + protected List listTablesImpl(String databaseName) { + if (!databaseExists(databaseName)) { + throw new RuntimeException(String.format("Database does not exist: %s", databaseName)); + } + return fetch( + row -> row.getString(JdbcUtils.TABLE_NAME), + JdbcUtils.LIST_TABLES_SQL, + catalogName, + databaseName); + } + + @Override + protected void dropTableImpl(Identifier identifier) { + try { + int deletedRecords = + execute( + connections, + JdbcUtils.DROP_TABLE_SQL, + catalogName, + identifier.getDatabaseName(), + identifier.getObjectName()); + + if (deletedRecords == 0) { + LOG.info("Skipping drop, table does not exist: {}", identifier); + return; + } + Path path = getDataTableLocation(identifier); + try { + if (fileIO.exists(path)) { + fileIO.deleteDirectoryQuietly(path); + } + } catch (Exception ex) { + LOG.error("Delete directory[{}] fail for table {}", path, identifier, ex); + } + } catch (Exception e) { + throw new RuntimeException("Failed to drop table " + identifier.getFullName(), e); + } + } + + @Override + protected void createTableImpl(Identifier identifier, Schema schema) { + try { + // create table file + getSchemaManager(identifier).createTable(schema); + // Update schema metadata + Path path = getDataTableLocation(identifier); + int insertRecord = + connections.run( + conn -> { + try (PreparedStatement sql = + conn.prepareStatement( + JdbcUtils.DO_COMMIT_CREATE_TABLE_SQL)) { + sql.setString(1, catalogName); + sql.setString(2, identifier.getDatabaseName()); + sql.setString(3, identifier.getObjectName()); + sql.setString(4, path.toString()); + return sql.executeUpdate(); + } + }); + if (insertRecord == 1) { + LOG.debug("Successfully committed to new table: {}", identifier); + } else { + try { + fileIO.deleteDirectoryQuietly(path); + } catch (Exception ee) { + LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); + } + throw new RuntimeException( + String.format( + "Failed to create table %s in catalog %s", + identifier.getFullName(), catalogName)); + } + } catch (Exception e) { + throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); + } + } + + @Override + protected void renameTableImpl(Identifier fromTable, Identifier toTable) { + try { + // update table metadata info + updateTable(connections, catalogName, fromTable, toTable); + + Path fromPath = getDataTableLocation(fromTable); + if (new SchemaManager(fileIO, fromPath).listAllIds().size() > 0) { + // Rename the file system's table directory. Maintain consistency between tables in + // the file system and tables in the Hive Metastore. + Path toPath = getDataTableLocation(toTable); + try { + fileIO.rename(fromPath, toPath); + } catch (IOException e) { + throw new RuntimeException( + "Failed to rename changes of table " + + toTable.getFullName() + + " to underlying files.", + e); + } + // Update table metadata + updateTableMetadataLocation( + connections, catalogName, toTable, toPath.toString(), fromPath.toString()); + } + } catch (Exception e) { + throw new RuntimeException("Failed to rename table " + fromTable.getFullName(), e); + } + } + + @Override + protected void alterTableImpl(Identifier identifier, List changes) + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + if (!tableExists(identifier)) { + throw new RuntimeException( + String.format("Table is not exists {}", identifier.getFullName())); + } + final SchemaManager schemaManager = getSchemaManager(identifier); + // first commit changes to underlying files + TableSchema schema = schemaManager.commitChanges(changes); + try { + String newMetadataLocation = getDataTableLocation(identifier).toString(); + Map tableMetadata = + JdbcUtils.getTable( + connections, + catalogName, + identifier.getDatabaseName(), + identifier.getObjectName()); + String oldMetadataLocation = tableMetadata.get(JdbcUtils.METADATA_LOCATION_PROP); + if (!newMetadataLocation.equals(oldMetadataLocation)) { + updateTableMetadataLocation( + connections, + catalogName, + identifier, + newMetadataLocation, + oldMetadataLocation); + } + } catch (Exception te) { + schemaManager.deleteSchema(schema.id()); + throw new RuntimeException(te); + } + } + + @Override + protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + if (!tableExists(identifier)) { + throw new TableNotExistException(identifier); + } + Path tableLocation = getDataTableLocation(identifier); + return new SchemaManager(fileIO, tableLocation) + .latest() + .orElseThrow( + () -> new RuntimeException("There is no paimon table in " + tableLocation)); + } + + @Override + public boolean tableExists(Identifier identifier) { + if (isSystemTable(identifier)) { + return super.tableExists(identifier); + } + return JdbcUtils.tableExists( + connections, catalogName, identifier.getDatabaseName(), identifier.getObjectName()); + } + + @Override + public boolean caseSensitive() { + return false; + } + + @Override + public Optional lockFactory() { + return lockEnabled() + ? Optional.of( + JdbcCatalogLock.createFactory(connections, catalogName, configuration)) + : Optional.empty(); + } + + private boolean lockEnabled() { + return Boolean.parseBoolean( + configuration.getOrDefault( + LOCK_ENABLED.key(), LOCK_ENABLED.defaultValue().toString())); + } + + private Lock lock(Identifier identifier) { + if (!lockEnabled()) { + return new Lock.EmptyLock(); + } + JdbcCatalogLock lock = + new JdbcCatalogLock( + connections, + catalogName, + checkMaxSleep(configuration), + acquireTimeout(configuration)); + return Lock.fromCatalog(lock, identifier); + } + + @Override + public void close() throws Exception { + if (!connections.isClosed()) { + connections.close(); + } + } + + private SchemaManager getSchemaManager(Identifier identifier) { + return new SchemaManager(fileIO, getDataTableLocation(identifier)) + .withLock(lock(identifier)); + } + + private Map fetchProperties(String databaseName) { + if (!databaseExists(databaseName)) { + throw new RuntimeException(String.format("Database does not exist: %s", databaseName)); + } + List> entries = + fetch( + row -> + new AbstractMap.SimpleImmutableEntry<>( + row.getString(JdbcUtils.DATABASE_PROPERTY_KEY), + row.getString(JdbcUtils.DATABASE_PROPERTY_VALUE)), + JdbcUtils.GET_ALL_DATABASE_PROPERTIES_SQL, + catalogName, + databaseName); + return ImmutableMap.builder().putAll(entries).build(); + } + + @FunctionalInterface + interface RowProducer { + R apply(ResultSet result) throws SQLException; + } + + @SuppressWarnings("checkstyle:NestedTryDepth") + private List fetch(RowProducer toRow, String sql, String... args) { + try { + return connections.run( + conn -> { + List result = Lists.newArrayList(); + try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + for (int pos = 0; pos < args.length; pos += 1) { + preparedStatement.setString(pos + 1, args[pos]); + } + try (ResultSet rs = preparedStatement.executeQuery()) { + while (rs.next()) { + result.add(toRow.apply(rs)); + } + } + } + return result; + }); + } catch (SQLException e) { + throw new RuntimeException(String.format("Failed to execute query: %s", sql), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java new file mode 100644 index 0000000000000..899f5dc475c95 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; + +/** Factory to create {@link JdbcCatalog}. */ +public class JdbcCatalogFactory implements CatalogFactory { + + public static final String IDENTIFIER = "jdbc"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Catalog create(FileIO fileIO, Path warehouse, CatalogContext context) { + return new JdbcCatalog(fileIO, null, context.options().toMap(), warehouse.getName()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java new file mode 100644 index 0000000000000..06c123df8f193 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogLock.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.CatalogLock; +import org.apache.paimon.utils.TimeUtils; + +import java.io.IOException; +import java.sql.SQLException; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.Callable; + +import static org.apache.paimon.options.CatalogOptions.LOCK_ACQUIRE_TIMEOUT; +import static org.apache.paimon.options.CatalogOptions.LOCK_CHECK_MAX_SLEEP; + +/** Jdbc catalog lock. */ +public class JdbcCatalogLock implements CatalogLock { + private final JdbcClientPool connections; + private final long checkMaxSleep; + private final long acquireTimeout; + private final String catalogName; + + public JdbcCatalogLock( + JdbcClientPool connections, + String catalogName, + long checkMaxSleep, + long acquireTimeout) { + this.connections = connections; + this.checkMaxSleep = checkMaxSleep; + this.acquireTimeout = acquireTimeout; + this.catalogName = catalogName; + } + + @Override + public T runWithLock(String database, String table, Callable callable) throws Exception { + String lockUniqueName = String.format("%s.%s.%s", catalogName, database, table); + lock(lockUniqueName); + try { + return callable.call(); + } finally { + JdbcUtils.release(connections, lockUniqueName); + } + } + + private void lock(String lockUniqueName) throws SQLException, InterruptedException { + boolean lock = JdbcUtils.acquire(connections, lockUniqueName, acquireTimeout); + long nextSleep = 50; + long startRetry = System.currentTimeMillis(); + while (!lock) { + nextSleep *= 2; + if (nextSleep > checkMaxSleep) { + nextSleep = checkMaxSleep; + } + Thread.sleep(nextSleep); + lock = JdbcUtils.acquire(connections, lockUniqueName, acquireTimeout); + if (System.currentTimeMillis() - startRetry > acquireTimeout) { + break; + } + } + long retryDuration = System.currentTimeMillis() - startRetry; + if (!lock) { + throw new RuntimeException( + "Acquire lock failed with time: " + Duration.ofMillis(retryDuration)); + } + } + + @Override + public void close() throws IOException { + // Do nothing + } + + /** Create a jdbc lock factory. */ + public static CatalogLock.Factory createFactory( + JdbcClientPool connections, String catalogName, Map conf) { + return new JdbcCatalogLockFactory(connections, catalogName, conf); + } + + private static class JdbcCatalogLockFactory implements CatalogLock.Factory { + + private static final long serialVersionUID = 1L; + + private final JdbcClientPool connections; + private final String catalogName; + private final Map conf; + + public JdbcCatalogLockFactory( + JdbcClientPool connections, String catalogName, Map conf) { + this.connections = connections; + this.catalogName = catalogName; + this.conf = conf; + } + + @Override + public CatalogLock create() { + return new JdbcCatalogLock( + connections, catalogName, checkMaxSleep(conf), acquireTimeout(conf)); + } + } + + public static long checkMaxSleep(Map conf) { + return TimeUtils.parseDuration( + conf.getOrDefault( + LOCK_CHECK_MAX_SLEEP.key(), + TimeUtils.getStringInMillis(LOCK_CHECK_MAX_SLEEP.defaultValue()))) + .toMillis(); + } + + public static long acquireTimeout(Map conf) { + return TimeUtils.parseDuration( + conf.getOrDefault( + LOCK_ACQUIRE_TIMEOUT.key(), + TimeUtils.getStringInMillis(LOCK_ACQUIRE_TIMEOUT.defaultValue()))) + .toMillis(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java new file mode 100644 index 0000000000000..14109d08e452a --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalogOptions.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; + +/** Options for jdbc catalog. */ +public final class JdbcCatalogOptions { + public static final ConfigOption INITIALIZE_CATALOG_TABLES = + ConfigOptions.key("initialize-catalog-tables") + .booleanType() + .defaultValue(false) + .withDescription("Enable automatic table creation."); + + private JdbcCatalogOptions() {} +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java new file mode 100644 index 0000000000000..3bcf2d6d03efa --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcClientPool.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.ClientPoolImpl; +import org.apache.paimon.options.CatalogOptions; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.SQLNonTransientConnectionException; +import java.util.Map; +import java.util.Properties; + +/** Client pool for jdbc. */ +public class JdbcClientPool extends ClientPoolImpl { + + private final String dbUrl; + private final Map properties; + + public JdbcClientPool(String dbUrl, Map props) { + this( + Integer.parseInt( + props.getOrDefault( + CatalogOptions.CLIENT_POOL_SIZE.key(), + String.valueOf(CatalogOptions.CLIENT_POOL_SIZE.defaultValue()))), + dbUrl, + props); + } + + public JdbcClientPool(int poolSize, String dbUrl, Map props) { + super(poolSize, SQLNonTransientConnectionException.class, true); + properties = props; + this.dbUrl = dbUrl; + } + + @Override + protected Connection newClient() { + try { + Properties dbProps = + JdbcUtils.extractJdbcConfiguration(properties, JdbcCatalog.PROPERTY_PREFIX); + return DriverManager.getConnection(dbUrl, dbProps); + } catch (SQLException e) { + throw new RuntimeException(String.format("Failed to connect: %s", dbUrl), e); + } + } + + @Override + protected Connection reconnect(Connection client) { + close(client); + return newClient(); + } + + @Override + protected void close(Connection client) { + try { + client.close(); + } catch (SQLException e) { + throw new RuntimeException("Failed to close connection", e); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java new file mode 100644 index 0000000000000..231f4d44a2983 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -0,0 +1,646 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.Identifier; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.function.Consumer; +import java.util.stream.Stream; + +/** Util for jdbc catalog. */ +public class JdbcUtils { + private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + public static final String METADATA_LOCATION_PROP = "metadata_location"; + public static final String PREVIOUS_METADATA_LOCATION_PROP = "previous_metadata_location"; + public static final String CATALOG_TABLE_NAME = "paimon_tables"; + public static final String CATALOG_NAME = "catalog_name"; + public static final String TABLE_DATABASE = "database_name"; + public static final String TABLE_NAME = "table_name"; + + static final String DO_COMMIT_SQL = + "UPDATE " + + CATALOG_TABLE_NAME + + " SET " + + METADATA_LOCATION_PROP + + " = ? , " + + PREVIOUS_METADATA_LOCATION_PROP + + " = ? " + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? AND " + + METADATA_LOCATION_PROP + + " = ?"; + static final String CREATE_CATALOG_TABLE = + "CREATE TABLE " + + CATALOG_TABLE_NAME + + "(" + + CATALOG_NAME + + " VARCHAR(255) NOT NULL," + + TABLE_DATABASE + + " VARCHAR(255) NOT NULL," + + TABLE_NAME + + " VARCHAR(255) NOT NULL," + + METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + PREVIOUS_METADATA_LOCATION_PROP + + " VARCHAR(1000)," + + " PRIMARY KEY (" + + CATALOG_NAME + + ", " + + TABLE_DATABASE + + ", " + + TABLE_NAME + + ")" + + ")"; + static final String GET_TABLE_SQL = + "SELECT * FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? "; + static final String LIST_TABLES_SQL = + "SELECT * FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + TABLE_DATABASE + + " = ?"; + + static final String DELETE_TABLES_SQL = + "DELETE FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + TABLE_DATABASE + + " = ?"; + static final String RENAME_TABLE_SQL = + "UPDATE " + + CATALOG_TABLE_NAME + + " SET " + + TABLE_DATABASE + + " = ? , " + + TABLE_NAME + + " = ? " + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? "; + static final String DROP_TABLE_SQL = + "DELETE FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? "; + static final String GET_DATABASE_SQL = + "SELECT " + + TABLE_DATABASE + + " FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + TABLE_DATABASE + + " = ? LIMIT 1"; + + static final String LIST_ALL_TABLE_DATABASES_SQL = + "SELECT DISTINCT " + + TABLE_DATABASE + + " FROM " + + CATALOG_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ?"; + static final String DO_COMMIT_CREATE_TABLE_SQL = + "INSERT INTO " + + CATALOG_TABLE_NAME + + " (" + + CATALOG_NAME + + ", " + + TABLE_DATABASE + + ", " + + TABLE_NAME + + ", " + + METADATA_LOCATION_PROP + + ", " + + PREVIOUS_METADATA_LOCATION_PROP + + ") " + + " VALUES (?,?,?,?,null)"; + + // Catalog database Properties + static final String DATABASE_PROPERTIES_TABLE_NAME = "paimon_database_properties"; + static final String DATABASE_NAME = "database_name"; + static final String DATABASE_PROPERTY_KEY = "property_key"; + static final String DATABASE_PROPERTY_VALUE = "property_value"; + + static final String CREATE_DATABASE_PROPERTIES_TABLE = + "CREATE TABLE " + + DATABASE_PROPERTIES_TABLE_NAME + + "(" + + CATALOG_NAME + + " VARCHAR(255) NOT NULL," + + DATABASE_NAME + + " VARCHAR(255) NOT NULL," + + DATABASE_PROPERTY_KEY + + " VARCHAR(255)," + + DATABASE_PROPERTY_VALUE + + " VARCHAR(1000)," + + "PRIMARY KEY (" + + CATALOG_NAME + + ", " + + DATABASE_NAME + + ", " + + DATABASE_PROPERTY_KEY + + ")" + + ")"; + static final String GET_DATABASE_PROPERTIES_SQL = + "SELECT " + + DATABASE_NAME + + " FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + DATABASE_NAME + + " = ? "; + static final String INSERT_DATABASE_PROPERTIES_SQL = + "INSERT INTO " + + DATABASE_PROPERTIES_TABLE_NAME + + " (" + + CATALOG_NAME + + ", " + + DATABASE_NAME + + ", " + + DATABASE_PROPERTY_KEY + + ", " + + DATABASE_PROPERTY_VALUE + + ") VALUES "; + static final String INSERT_PROPERTIES_VALUES_BASE = "(?,?,?,?)"; + static final String GET_ALL_DATABASE_PROPERTIES_SQL = + "SELECT * " + + " FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + DATABASE_NAME + + " = ? "; + static final String DELETE_DATABASE_PROPERTIES_SQL = + "DELETE FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + DATABASE_NAME + + " = ? AND " + + DATABASE_PROPERTY_KEY + + " IN "; + static final String DELETE_ALL_DATABASE_PROPERTIES_SQL = + "DELETE FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + DATABASE_NAME + + " = ?"; + static final String LIST_PROPERTY_DATABASES_SQL = + "SELECT DISTINCT " + + DATABASE_NAME + + " FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ? AND " + + DATABASE_NAME + + " LIKE ?"; + static final String LIST_ALL_PROPERTY_DATABASES_SQL = + "SELECT DISTINCT " + + DATABASE_NAME + + " FROM " + + DATABASE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_NAME + + " = ?"; + + // Distributed locks table + static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks"; + static final String LOCK_NAME = "lock_name"; + static final String ACQUIRED_AT = "acquired_at"; + + static final String CREATE_DISTRIBUTED_LOCK_TABLE_SQL = + "CREATE TABLE " + + DISTRIBUTED_LOCKS_TABLE_NAME + + "(" + + LOCK_NAME + + " VARCHAR(1000) NOT NULL," + + ACQUIRED_AT + + " TIMESTAMP NULL DEFAULT NULL," + + "PRIMARY KEY (" + + LOCK_NAME + + ")" + + ")"; + + static final String DISTRIBUTED_LOCK_ACQUIRE_SQL = + "INSERT INTO " + + DISTRIBUTED_LOCKS_TABLE_NAME + + " (" + + LOCK_NAME + + ", " + + ACQUIRED_AT + + ") VALUES (?, ?)"; + + static final String DISTRIBUTED_LOCK_RELEASE_SQL = + "DELETE FROM " + DISTRIBUTED_LOCKS_TABLE_NAME + " WHERE " + LOCK_NAME + " = ?"; + + static final String DISTRIBUTED_LOCK_EXPIRE_CLEAR_SQL = + "DELETE FROM " + + DISTRIBUTED_LOCKS_TABLE_NAME + + " WHERE " + + LOCK_NAME + + " = ? AND " + + ACQUIRED_AT + + " < ?"; + + public static Properties extractJdbcConfiguration( + Map properties, String prefix) { + Properties result = new Properties(); + properties.forEach( + (key, value) -> { + if (key.startsWith(prefix)) { + result.put(key.substring(prefix.length()), value); + } + }); + return result; + } + + /** Get paimon table metadata. */ + public static Map getTable( + JdbcClientPool connections, String catalogName, String databaseName, String tableName) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + Map table = Maps.newHashMap(); + + try (PreparedStatement sql = conn.prepareStatement(JdbcUtils.GET_TABLE_SQL)) { + sql.setString(1, catalogName); + sql.setString(2, databaseName); + sql.setString(3, tableName); + ResultSet rs = sql.executeQuery(); + if (rs.next()) { + table.put(CATALOG_NAME, rs.getString(CATALOG_NAME)); + table.put(TABLE_DATABASE, rs.getString(TABLE_DATABASE)); + table.put(TABLE_NAME, rs.getString(TABLE_NAME)); + table.put(METADATA_LOCATION_PROP, rs.getString(METADATA_LOCATION_PROP)); + table.put( + PREVIOUS_METADATA_LOCATION_PROP, + rs.getString(PREVIOUS_METADATA_LOCATION_PROP)); + } + rs.close(); + } + return table; + }); + } + + public static void updateTable( + JdbcClientPool connections, + String catalogName, + Identifier fromTable, + Identifier toTable) { + int updatedRecords = + execute( + err -> { + // SQLite doesn't set SQLState or throw + // SQLIntegrityConstraintViolationException + if (err instanceof SQLIntegrityConstraintViolationException + || (err.getMessage() != null + && err.getMessage().contains("constraint failed"))) { + throw new RuntimeException( + String.format("Table already exists: %s", toTable)); + } + }, + connections, + JdbcUtils.RENAME_TABLE_SQL, + toTable.getDatabaseName(), + toTable.getObjectName(), + catalogName, + fromTable.getDatabaseName(), + fromTable.getObjectName()); + + if (updatedRecords == 1) { + LOG.info("Renamed table from {}, to {}", fromTable, toTable); + } else if (updatedRecords == 0) { + throw new RuntimeException(String.format("Table does not exist: %s", fromTable)); + } else { + LOG.warn( + "Rename operation affected {} rows: the catalog table's primary key assumption has been violated", + updatedRecords); + } + } + + /** Update table metadata location. */ + public static void updateTableMetadataLocation( + JdbcClientPool connections, + String catalogName, + Identifier identifier, + String newMetadataLocation, + String oldMetadataLocation) + throws SQLException, InterruptedException { + int updatedRecords = + connections.run( + conn -> { + try (PreparedStatement sql = + conn.prepareStatement(JdbcUtils.DO_COMMIT_SQL)) { + // UPDATE + sql.setString(1, newMetadataLocation); + sql.setString(2, oldMetadataLocation); + // WHERE + sql.setString(3, catalogName); + sql.setString(4, identifier.getDatabaseName()); + sql.setString(5, identifier.getObjectName()); + sql.setString(6, oldMetadataLocation); + return sql.executeUpdate(); + } + }); + if (updatedRecords == 1) { + LOG.debug("Successfully committed to existing table: {}", identifier.getFullName()); + } else { + throw new RuntimeException( + String.format( + "Failed to update table %s from catalog %s", + identifier.getFullName(), catalogName)); + } + } + + public static boolean databaseExists( + JdbcClientPool connections, String catalogName, String databaseName) { + + if (exists(connections, JdbcUtils.GET_DATABASE_SQL, catalogName, databaseName)) { + return true; + } + + if (exists(connections, JdbcUtils.GET_DATABASE_PROPERTIES_SQL, catalogName, databaseName)) { + return true; + } + return false; + } + + public static boolean tableExists( + JdbcClientPool connections, String catalogName, String databaseName, String tableName) { + if (exists(connections, JdbcUtils.GET_TABLE_SQL, catalogName, databaseName, tableName)) { + return true; + } + return false; + } + + @SuppressWarnings("checkstyle:NestedTryDepth") + private static boolean exists(JdbcClientPool connections, String sql, String... args) { + try { + return connections.run( + conn -> { + try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + for (int index = 0; index < args.length; index++) { + preparedStatement.setString(index + 1, args[index]); + } + try (ResultSet rs = preparedStatement.executeQuery()) { + if (rs.next()) { + return true; + } + } + } + return false; + }); + } catch (SQLException e) { + throw new RuntimeException(String.format("Failed to execute exists query: %s", sql), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } + + public static int execute(JdbcClientPool connections, String sql, String... args) { + return execute(err -> {}, connections, sql, args); + } + + public static int execute( + Consumer sqlErrorHandler, + JdbcClientPool connections, + String sql, + String... args) { + try { + return connections.run( + conn -> { + try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) { + for (int pos = 0; pos < args.length; pos++) { + preparedStatement.setString(pos + 1, args[pos]); + } + return preparedStatement.executeUpdate(); + } + }); + } catch (SQLException e) { + sqlErrorHandler.accept(e); + throw new RuntimeException(String.format("Failed to execute: %s", sql), e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted in SQL command", e); + } + } + + public static boolean insertProperties( + JdbcClientPool connections, + String catalogName, + String databaseName, + Map properties) { + String[] args = + properties.entrySet().stream() + .flatMap( + entry -> + Stream.of( + catalogName, + databaseName, + entry.getKey(), + entry.getValue())) + .toArray(String[]::new); + + int insertedRecords = + execute(connections, JdbcUtils.insertPropertiesStatement(properties.size()), args); + if (insertedRecords == properties.size()) { + return true; + } + throw new IllegalStateException( + String.format( + "Failed to insert: %d of %d succeeded", + insertedRecords, properties.size())); + } + + public static boolean updateProperties( + JdbcClientPool connections, + String catalogName, + String databaseName, + Map properties) { + Stream caseArgs = + properties.entrySet().stream() + .flatMap(entry -> Stream.of(entry.getKey(), entry.getValue())); + Stream whereArgs = + Stream.concat(Stream.of(catalogName, databaseName), properties.keySet().stream()); + String[] args = Stream.concat(caseArgs, whereArgs).toArray(String[]::new); + int updatedRecords = + execute(connections, JdbcUtils.updatePropertiesStatement(properties.size()), args); + if (updatedRecords == properties.size()) { + return true; + } + throw new IllegalStateException( + String.format( + "Failed to update: %d of %d succeeded", updatedRecords, properties.size())); + } + + public static boolean deleteProperties( + JdbcClientPool connections, + String catalogName, + String databaseName, + Set properties) { + String[] args = + Stream.concat(Stream.of(catalogName, databaseName), properties.stream()) + .toArray(String[]::new); + + return execute(connections, JdbcUtils.deletePropertiesStatement(properties), args) > 0; + } + + private static String insertPropertiesStatement(int size) { + StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_DATABASE_PROPERTIES_SQL); + for (int i = 0; i < size; i++) { + if (i != 0) { + sqlStatement.append(", "); + } + sqlStatement.append(JdbcUtils.INSERT_PROPERTIES_VALUES_BASE); + } + return sqlStatement.toString(); + } + + private static String deletePropertiesStatement(Set properties) { + StringBuilder sqlStatement = new StringBuilder(JdbcUtils.DELETE_DATABASE_PROPERTIES_SQL); + String values = + String.join(",", Collections.nCopies(properties.size(), String.valueOf('?'))); + sqlStatement.append("(").append(values).append(")"); + + return sqlStatement.toString(); + } + + private static String updatePropertiesStatement(int size) { + StringBuilder sqlStatement = + new StringBuilder( + "UPDATE " + + DATABASE_PROPERTIES_TABLE_NAME + + " SET " + + DATABASE_PROPERTY_VALUE + + " = CASE"); + for (int i = 0; i < size; i += 1) { + sqlStatement.append(" WHEN " + DATABASE_PROPERTY_KEY + " = ? THEN ?"); + } + sqlStatement.append( + " END WHERE " + + CATALOG_NAME + + " = ? AND " + + DATABASE_NAME + + " = ? AND " + + DATABASE_PROPERTY_KEY + + " IN "); + + String values = String.join(",", Collections.nCopies(size, String.valueOf('?'))); + sqlStatement.append("(").append(values).append(")"); + return sqlStatement.toString(); + } + + public static boolean acquire(JdbcClientPool connections, String lockName, long timeout) + throws SQLException, InterruptedException { + // Check and clear expire lock + int affectedRows = tryClearExpireLock(connections, lockName, timeout); + if (affectedRows > 0) { + LOG.debug("Successfully cleared " + affectedRows + " lock records"); + } + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(DISTRIBUTED_LOCK_ACQUIRE_SQL)) { + preparedStatement.setString(1, lockName); + preparedStatement.setTimestamp( + 2, new Timestamp(System.currentTimeMillis())); + return preparedStatement.executeUpdate() > 0; + } catch (SQLException e) { + LOG.error("Try acquire lock failed.", e); + return false; + } + }); + } + + public static void release(JdbcClientPool connections, String lockName) + throws SQLException, InterruptedException { + connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(DISTRIBUTED_LOCK_RELEASE_SQL)) { + preparedStatement.setString(1, lockName); + return preparedStatement.executeUpdate() > 0; + } + }); + } + + private static int tryClearExpireLock(JdbcClientPool connections, String lockName, long timeout) + throws SQLException, InterruptedException { + long expirationTimeMillis = System.currentTimeMillis() - timeout * 1000; + Timestamp expirationTimestamp = new Timestamp(expirationTimeMillis); + return connections.run( + connection -> { + try (PreparedStatement preparedStatement = + connection.prepareStatement(DISTRIBUTED_LOCK_EXPIRE_CLEAR_SQL)) { + preparedStatement.setString(1, lockName); + preparedStatement.setTimestamp(2, expirationTimestamp); + return preparedStatement.executeUpdate(); + } + }); + } +} diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 485ba553145da..cc2b3f0631386 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -14,3 +14,4 @@ # limitations under the License. org.apache.paimon.catalog.FileSystemCatalogFactory +org.apache.paimon.jdbc.JdbcCatalogFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java new file mode 100644 index 0000000000000..5afa4d2601ea9 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.jdbc; + +import org.apache.paimon.catalog.CatalogTestBase; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.CatalogOptions; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link JdbcCatalog}. */ +public class JdbcCatalogTest extends CatalogTestBase { + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + catalog = initCatalog("test-jdbc-catalog", Maps.newHashMap()); + } + + private JdbcCatalog initCatalog(String catalogName, Map props) { + Map properties = Maps.newHashMap(); + properties.put( + CatalogOptions.URI.key(), + "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + + properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + properties.put(CatalogOptions.WAREHOUSE.key(), warehouse); + properties.put(JdbcCatalogOptions.INITIALIZE_CATALOG_TABLES.key(), "true"); + properties.putAll(props); + JdbcCatalog jdbcCatalog = new JdbcCatalog(fileIO, catalogName, properties, warehouse); + return jdbcCatalog; + } + + @Test + @Override + public void testListDatabasesWhenNoDatabases() { + List databases = catalog.listDatabases(); + assertThat(databases).isEqualTo(new ArrayList<>()); + } + + @Test + public void testCheckIdentifierUpperCase() throws Exception { + catalog.createDatabase("test_db", false); + assertThatThrownBy( + () -> + catalog.createTable( + Identifier.create("TEST_DB", "new_table"), + DEFAULT_TABLE_SCHEMA, + false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Database name [TEST_DB] cannot contain upper case in the catalog."); + + assertThatThrownBy( + () -> + catalog.createTable( + Identifier.create("test_db", "NEW_TABLE"), + DEFAULT_TABLE_SCHEMA, + false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Table name [NEW_TABLE] cannot contain upper case in the catalog."); + } +} diff --git a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java index bef414915dc8c..37d8661d2ead9 100644 --- a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java +++ b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java @@ -76,6 +76,7 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation("paimon-common", "org.apache.paimon"), new OptionsClassLocation("paimon-core", "org.apache.paimon.lookup"), new OptionsClassLocation("paimon-core", "org.apache.paimon.catalog"), + new OptionsClassLocation("paimon-core", "org.apache.paimon.jdbc"), new OptionsClassLocation( "paimon-flink/paimon-flink-common", "org.apache.paimon.flink"), new OptionsClassLocation(