Skip to content

Commit

Permalink
Support jdbc catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Feb 22, 2024
1 parent d939739 commit 4fd21ac
Show file tree
Hide file tree
Showing 17 changed files with 1,925 additions and 2 deletions.
183 changes: 183 additions & 0 deletions docs/content/how-to/creating-catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Paimon catalogs currently support two types of metastores:

* `filesystem` metastore (default), which stores both metadata and table files in filesystems.
* `hive` metastore, which additionally stores metadata in Hive metastore. Users can directly access the tables from Hive.
* `jdbc` metastore, which additionally stores metadata in relational databases such as MySQL, Postgres, etc.

See [CatalogOptions]({{< ref "maintenance/configurations#catalogoptions" >}}) for detailed options when creating a catalog.

Expand Down Expand Up @@ -175,3 +176,185 @@ Using the table option facilitates the convenient definition of Hive table param
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process.
## Creating a Catalog with Filesystem Metastore
{{< tabs "filesystem-metastore-example" >}}
{{< tab "Flink" >}}
The following Flink SQL registers and uses a Paimon catalog named `my_catalog`. Metadata and table files are stored under `hdfs:///path/to/warehouse`.
```sql
CREATE CATALOG my_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs:///path/to/warehouse'
);
USE CATALOG my_catalog;
```
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
{{< /tab >}}
{{< tab "Spark3" >}}
The following shell command registers a paimon catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`.
```bash
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse
```
You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog.
After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
```sql
USE paimon.default;
```
{{< /tab >}}
{{< /tabs >}}
## Creating a Catalog with Hive Metastore
By using Paimon Hive catalog, changes to the catalog will directly affect the corresponding Hive metastore. Tables created in such catalog can also be accessed directly from Hive.
To use Hive catalog, Database name, Table name and Field names should be **lower** case.
{{< tabs "hive-metastore-example" >}}
{{< tab "Flink" >}}
Paimon Hive catalog in Flink relies on Flink Hive connector bundled jar. You should first download Hive connector bundled jar and add it to classpath.
| Metastore version | Bundle Name | SQL Client JAR |
|:------------------|:--------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.3.0 - 3.1.3 | Flink Bundle | [Download](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/hive/overview/#using-bundled-hive-jar) |
| 1.2.0 - x.x.x | Presto Bundle | [Download](https://repo.maven.apache.org/maven2/com/facebook/presto/hive/hive-apache/1.2.2-2/hive-apache-1.2.2-2.jar) |
The following Flink SQL registers and uses a Paimon Hive catalog named `my_hive`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore.
If your Hive requires security authentication such as Kerberos, LDAP, Ranger or you want the paimon table to be managed
by Apache Atlas(Setting 'hive.metastore.event.listeners' in hive-site.xml). You can specify the hive-conf-dir and
hadoop-conf-dir parameter to the hive-site.xml file path.
```sql
CREATE CATALOG my_hive WITH (
'type' = 'paimon',
'metastore' = 'hive',
-- 'uri' = 'thrift://<hive-metastore-host-name>:<port>', default use 'hive.metastore.uris' in HiveConf
-- 'hive-conf-dir' = '...', this is recommended in the kerberos environment
-- 'hadoop-conf-dir' = '...', this is recommended in the kerberos environment
-- 'warehouse' = 'hdfs:///path/to/warehouse', default use 'hive.metastore.warehouse.dir' in HiveConf
);
USE CATALOG my_hive;
```
You can define any default table options with the prefix `table-default.` for tables created in the catalog.
Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).
{{< /tab >}}
{{< tab "Spark3" >}}
Your Spark installation should be able to detect, or already contains Hive dependencies. See [here](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html) for more information.
The following shell command registers a Paimon Hive catalog named `paimon`. Metadata and table files are stored under `hdfs:///path/to/warehouse`. In addition, metadata is also stored in Hive metastore.
```bash
spark-sql ... \
--conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
--conf spark.sql.catalog.paimon.warehouse=hdfs:///path/to/warehouse \
--conf spark.sql.catalog.paimon.metastore=hive \
--conf spark.sql.catalog.paimon.uri=thrift://<hive-metastore-host-name>:<port>
```
You can define any default table options with the prefix `spark.sql.catalog.paimon.table-default.` for tables created in the catalog.
After `spark-sql` is started, you can switch to the `default` database of the `paimon` catalog with the following SQL.
```sql
USE paimon.default;
```
Also, you can create [SparkGenericCatalog]({{< ref "engines/spark" >}}).
{{< /tab >}}
{{< /tabs >}}
> When using hive catalog to change incompatible column types through alter table, you need to configure `hive.metastore.disallow.incompatible.col.type.changes=false`. see [HIVE-17832](https://issues.apache.org/jira/browse/HIVE-17832).
> If you are using Hive3, please disable Hive ACID:
>
> ```shell
> hive.strict.managed.tables=false
> hive.create.as.insert.only=false
> metastore.create.as.acid=false
> ```
### Setting Location in Properties
If you are using an object storage , and you don't want that the location of paimon table/database is accessed by the filesystem of hive,
which may lead to the error such as "No FileSystem for scheme: s3a".
You can set location in the properties of table/database by the config of `location-in-properties`. See
[setting the location of table/database in properties ]({{< ref "maintenance/configurations#HiveCatalogOptions" >}})
### Synchronizing Partitions into Hive Metastore
By default, Paimon does not synchronize newly created partitions into Hive metastore. Users will see an unpartitioned table in Hive. Partition push-down will be carried out by filter push-down instead.
If you want to see a partitioned table in Hive and also synchronize newly created partitions into Hive metastore, please set the table property `metastore.partitioned-table` to true. Also see [CoreOptions]({{< ref "maintenance/configurations#CoreOptions" >}}).
### Adding Parameters to a Hive Table
Using the table option facilitates the convenient definition of Hive table parameters.
Parameters prefixed with `hive.` will be automatically defined in the `TBLPROPERTIES` of the Hive table.
For instance, using the option `hive.table.owner=Jon` will automatically add the parameter `table.owner=Jon` to the table properties during the creation process.
## Creating a Catalog with JDBC Metastore
By using the Paimon JDBC catalog, changes to the catalog will be directly stored in relational databases such as MySQL, postgres, etc.
{{< tabs "jdbc-metastore-example" >}}
{{< tab "Flink" >}}
Paimon JDBC Catalog in Flink needs to correctly add the corresponding jar package for connecting to the database. You should first download JDBC connector bundled jar and add it to classpath. such as MySQL, postgres
| database type | Bundle Name | SQL Client JAR |
|:--------------|:---------------------|:---------------------------------------------------------------------------|
| mysql | mysql-connector-java | [Download](https://mvnrepository.com/artifact/mysql/mysql-connector-java) |
| postgres | postgresql | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
```sql
CREATE CATALOG my_jdbc WITH (
'type' = 'paimon',
'metastore' = 'jdbc',
-- 'uri' = 'jdbc:mysql://<host>:<port>/<databaseName>',
-- 'jdbc.user' = '...',
-- 'jdbc.password' = '...',
-- 'initialize-catalog-tables'='true'
-- 'warehouse' = 'hdfs:///path/to/warehouse',
);
USE CATALOG my_jdbc;
```
You can define any connection parameters for a database with the prefix "jdbc.".

You can define any default table options with the prefix `table-default.` for tables created in the catalog.

Also, you can create [FlinkGenericCatalog]({{< ref "engines/flink" >}}).

{{< /tab >}}

{{< /tabs >}}
6 changes: 6 additions & 0 deletions docs/content/maintenance/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ Options for Hive catalog.

{{< generated/hive_catalog_configuration >}}

### JdbcCatalogOptions

Options for Jdbc catalog.

{{< generated/jdbc_catalog_configuration >}}

### FlinkCatalogOptions

Flink catalog options for paimon.
Expand Down
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Configure the size of the connection pool.</td>
</tr>
<tr>
<td><h5>fs.allow-hadoop-fallback</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down Expand Up @@ -60,7 +66,7 @@
<td><h5>metastore</h5></td>
<td style="word-wrap: break-word;">"filesystem"</td>
<td>String</td>
<td>Metastore of paimon catalog, supports filesystem and hive.</td>
<td>Metastore of paimon catalog, supports filesystem、hive and jdbc.</td>
</tr>
<tr>
<td><h5>table.type</h5></td>
Expand Down
36 changes: 36 additions & 0 deletions docs/layouts/shortcodes/generated/jdbc_catalog_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{{/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/}}
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>initialize-catalog-tables</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Enable automatic table creation.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class CatalogOptions {
ConfigOptions.key("metastore")
.stringType()
.defaultValue("filesystem")
.withDescription("Metastore of paimon catalog, supports filesystem and hive.");
.withDescription(
"Metastore of paimon catalog, supports filesystem、hive and jdbc.");

public static final ConfigOption<String> URI =
ConfigOptions.key("uri")
Expand Down Expand Up @@ -78,6 +79,12 @@ public class CatalogOptions {
.withDescription(
"Allow to fallback to hadoop File IO when no file io found for the scheme.");

public static final ConfigOption<Integer> CLIENT_POOL_SIZE =
key("client-pool-size")
.intType()
.defaultValue(2)
.withDescription("Configure the size of the connection pool.");

public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
Expand Down
7 changes: 7 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ under the License.
<version>3.6.1</version>
</dependency>

<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.44.0.0</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
31 changes: 31 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/ClientPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon;

/** Define client connection pool. */
public interface ClientPool<C, E extends Exception> {
/** Action interface for client. */
interface Action<R, C, E extends Exception> {
R run(C client) throws E;
}

<R> R run(Action<R, C, E> action) throws E, InterruptedException;

<R> R run(Action<R, C, E> action, boolean retry) throws E, InterruptedException;
}
Loading

0 comments on commit 4fd21ac

Please sign in to comment.